1#[cfg(feature = "byte_plugin")]
2pub mod byte_plugin;
3#[cfg(feature = "csv_plugin")]
4pub mod csv_plugin;
5#[cfg(feature = "json_plugin")]
6pub mod json_plugin;
7#[cfg(feature = "log_printer")]
8pub mod log_printer;
9#[cfg(feature = "statistics_plugin")]
10pub mod statistics_plugin;
11
12use std::collections::{HashMap, HashSet};
13use std::convert::Infallible;
14use std::error::Error;
15use std::io::Write;
16use std::marker::PhantomData;
17
18use rtlola_interpreter::monitor::{VerdictRepresentation, Verdicts};
19use rtlola_interpreter::output::{NewVerdictFactory, VerdictFactory};
20use rtlola_interpreter::rtlola_frontend::tag_parser::verbosity_parser::{
21 DebugParser, StreamVerbosity, VerbosityParser,
22};
23use rtlola_interpreter::rtlola_frontend::tag_parser::TagValidator;
24use rtlola_interpreter::rtlola_frontend::{RtLolaError, RtLolaMir};
25use rtlola_interpreter::rtlola_mir::StreamReference;
26use rtlola_interpreter::time::{OutputTimeRepresentation, TimeRepresentation};
27
28#[derive(Debug)]
30pub struct VerdictRepresentationFactory<
31 MonitorOutput: VerdictRepresentation,
32 OutputTime: OutputTimeRepresentation,
33> {
34 phantom: PhantomData<(MonitorOutput, OutputTime)>,
35}
36
37impl<MonitorOutput: VerdictRepresentation, OutputTime: OutputTimeRepresentation> Default
38 for VerdictRepresentationFactory<MonitorOutput, OutputTime>
39{
40 fn default() -> Self {
41 Self {
42 phantom: Default::default(),
43 }
44 }
45}
46
47impl<MonitorOutput: VerdictRepresentation, OutputTime: OutputTimeRepresentation>
48 VerdictFactory<MonitorOutput, OutputTime>
49 for VerdictRepresentationFactory<MonitorOutput, OutputTime>
50{
51 type Error = Infallible;
52 type Record = (MonitorOutput, OutputTime::InnerTime);
53
54 fn get_verdict(
55 &mut self,
56 rec: MonitorOutput,
57 ts: OutputTime::InnerTime,
58 ) -> Result<Self::Record, Self::Error> {
59 Ok((rec, ts))
60 }
61}
62
63impl<MonitorOutput: VerdictRepresentation, OutputTime: OutputTimeRepresentation>
64 NewVerdictFactory<MonitorOutput, OutputTime>
65 for VerdictRepresentationFactory<MonitorOutput, OutputTime>
66{
67 type CreationData = ();
68 type CreationError = Infallible;
69
70 fn new(_ir: &RtLolaMir, _data: Self::CreationData) -> Result<Self, Self::Error> {
71 Ok(Self {
72 phantom: Default::default(),
73 })
74 }
75}
76
77pub trait VerdictsSink<V: VerdictRepresentation, T: OutputTimeRepresentation> {
79 type Error: Error + 'static;
81 type Return;
83 type Factory: VerdictFactory<V, T>;
85
86 fn sink_verdicts(
88 &mut self,
89 verdicts: Verdicts<V, T>,
90 ) -> Result<
91 Vec<Self::Return>,
92 VerdictSinkError<Self::Error, <Self::Factory as VerdictFactory<V, T>>::Error>,
93 > {
94 let Verdicts { timed, event, ts } = verdicts;
95 timed
96 .into_iter()
97 .chain(vec![(ts, event)])
98 .map(|(ts, verdict)| self.sink_verdict(ts, verdict))
99 .collect::<Result<Vec<_>, _>>()
100 }
101 fn sink_verdict(
103 &mut self,
104 ts: <T as TimeRepresentation>::InnerTime,
105 verdict: V,
106 ) -> Result<
107 Self::Return,
108 VerdictSinkError<Self::Error, <Self::Factory as VerdictFactory<V, T>>::Error>,
109 > {
110 let verdict = self
111 .factory()
112 .get_verdict(verdict, ts)
113 .map_err(VerdictSinkError::Factory)?;
114 self.sink(verdict).map_err(VerdictSinkError::Sink)
115 }
116
117 fn sink(
119 &mut self,
120 verdict: <Self::Factory as VerdictFactory<V, T>>::Record,
121 ) -> Result<Self::Return, Self::Error>;
122
123 fn factory(&mut self) -> &mut Self::Factory;
125}
126
127#[derive(Debug)]
128pub enum VerdictSinkError<SinkError: Error + 'static, FactoryError: Error + 'static> {
130 #[allow(missing_docs)]
131 Sink(SinkError),
132 #[allow(missing_docs)]
133 Factory(FactoryError),
134}
135
136impl<SinkError: Error, FactoryError: Error> std::fmt::Display
137 for VerdictSinkError<SinkError, FactoryError>
138{
139 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140 match self {
141 VerdictSinkError::Sink(e) => write!(f, "{}", e),
142 VerdictSinkError::Factory(e) => write!(f, "{}", e),
143 }
144 }
145}
146
147impl<SinkError: Error, FactoryError: Error> Error for VerdictSinkError<SinkError, FactoryError> {
148 fn source(&self) -> Option<&(dyn Error + 'static)> {
149 match self {
150 VerdictSinkError::Sink(e) => Some(e),
151 VerdictSinkError::Factory(e) => Some(e),
152 }
153 }
154}
155
156#[derive(Debug)]
158pub struct ByteSink<
159 W: Write,
160 Factory: VerdictFactory<MonitorOutput, OutputTime, Record = Verdict>,
161 MonitorOutput: VerdictRepresentation,
162 OutputTime: OutputTimeRepresentation,
163 Verdict: Into<Vec<u8>>,
164> {
165 factory: Factory,
166 writer: W,
167 output: PhantomData<MonitorOutput>,
168 time: PhantomData<OutputTime>,
169}
170impl<
171 W: Write,
172 Factory: VerdictFactory<MonitorOutput, OutputTime, Record = Verdict>,
173 MonitorOutput: VerdictRepresentation,
174 OutputTime: OutputTimeRepresentation,
175 Verdict: Into<Vec<u8>>,
176 > ByteSink<W, Factory, MonitorOutput, OutputTime, Verdict>
177{
178 pub fn new(writer: W, factory: Factory) -> Self {
180 Self {
181 factory,
182 writer,
183 output: PhantomData,
184 time: PhantomData,
185 }
186 }
187}
188
189impl<
190 W: Write,
191 Factory: VerdictFactory<MonitorOutput, OutputTime, Record = Verdict>,
192 MonitorOutput: VerdictRepresentation,
193 OutputTime: OutputTimeRepresentation,
194 Verdict: Into<Vec<u8>>,
195 > VerdictsSink<MonitorOutput, OutputTime>
196 for ByteSink<W, Factory, MonitorOutput, OutputTime, Verdict>
197{
198 type Error = std::io::Error;
199 type Factory = Factory;
200 type Return = ();
201
202 fn sink(&mut self, verdict: Verdict) -> Result<Self::Return, Self::Error> {
203 let bytes: Vec<u8> = verdict.into();
204 self.writer.write_all(&bytes[..])?;
205 self.writer.flush()?;
206 Ok(())
207 }
208
209 fn factory(&mut self) -> &mut Self::Factory {
210 &mut self.factory
211 }
212}
213
214#[derive(Copy, Clone, Debug)]
216pub struct DiscardSink<
217 O: OutputTimeRepresentation,
218 V: VerdictRepresentation,
219 F: VerdictFactory<V, O>,
220> {
221 factory: F,
222 verdict: PhantomData<(O, V)>,
223}
224
225impl<
226 O: OutputTimeRepresentation,
227 V: VerdictRepresentation,
228 F: VerdictFactory<V, O, Record = ()>,
229 > DiscardSink<O, V, F>
230{
231 pub fn new(factory: F) -> Self {
233 Self {
234 factory,
235 verdict: Default::default(),
236 }
237 }
238}
239
240impl<O: OutputTimeRepresentation, V: VerdictRepresentation> Default
241 for DiscardSink<O, V, EmptyFactory>
242{
243 fn default() -> Self {
244 Self::new(EmptyFactory)
245 }
246}
247
248impl<O: OutputTimeRepresentation, V: VerdictRepresentation, F: VerdictFactory<V, O>>
249 VerdictsSink<V, O> for DiscardSink<O, V, F>
250{
251 type Error = Infallible;
252 type Factory = F;
253 type Return = ();
254
255 fn sink(
256 &mut self,
257 _verdict: <Self::Factory as VerdictFactory<V, O>>::Record,
258 ) -> Result<Self::Return, Self::Error> {
259 Ok(())
260 }
261
262 fn factory(&mut self) -> &mut Self::Factory {
263 &mut self.factory
264 }
265}
266
267#[derive(Default, Copy, Clone, Debug)]
269pub struct EmptyFactory;
270
271impl<V: VerdictRepresentation, O: OutputTimeRepresentation> VerdictFactory<V, O> for EmptyFactory {
272 type Error = Infallible;
273 type Record = ();
274
275 fn get_verdict(&mut self, _rec: V, _ts: O::InnerTime) -> Result<Self::Record, Self::Error> {
276 Ok(())
277 }
278}
279
280impl<V: VerdictRepresentation, O: OutputTimeRepresentation> NewVerdictFactory<V, O>
281 for EmptyFactory
282{
283 type CreationData = ();
284 type CreationError = Infallible;
285
286 fn new(_ir: &RtLolaMir, _data: Self::CreationData) -> Result<Self, Self::Error> {
287 Ok(Self)
288 }
289}
290
291#[derive(Debug, Clone)]
293pub struct VerbosityAnnotations {
294 stream_verbosity: HashMap<StreamReference, StreamVerbosity>,
295 debug_streams: HashSet<StreamReference>,
296}
297
298impl VerbosityAnnotations {
299 pub fn new(ir: &RtLolaMir) -> Result<VerbosityAnnotations, RtLolaError> {
301 let verbosity_parser = VerbosityParser;
302 let debug_parser = DebugParser;
303
304 let stream_verbosity_tags = ir.parse_tags(verbosity_parser)?;
305 let stream_verbosity = ir
306 .all_streams()
307 .map(|sr| (sr, *stream_verbosity_tags.local_tags(sr).unwrap()))
308 .collect();
309 let debug_tags = ir.parse_tags(debug_parser)?;
310 let debug_streams = ir
311 .all_streams()
312 .filter(|sr| *debug_tags.local_tags(*sr).unwrap())
313 .collect();
314
315 Ok(Self {
316 stream_verbosity,
317 debug_streams,
318 })
319 }
320
321 pub fn new_with_debug(
323 ir: &RtLolaMir,
324 debug_streams: &[String],
325 ) -> Result<VerbosityAnnotations, RtLolaError> {
326 let annotations = Self::new(ir)?;
327 let debug_streams = debug_streams
328 .into_iter()
329 .map(|sname| {
330 ir.get_stream_by_name(sname.as_str())
331 .ok_or_else(|| {
332 format!(
333 "stream {sname} marked for debugging, but not found in specification"
334 )
335 })
336 .map(|stream| stream.as_stream_ref())
337 })
338 .collect::<Result<Vec<_>, String>>()
339 .unwrap_or_else(|e| {
340 eprintln!("{e}");
341 std::process::exit(1);
342 });
343 Ok(annotations.add_debug_streams(&debug_streams))
344 }
345
346 pub fn parsers<'a>() -> &'a [&'a dyn TagValidator] {
348 &[&VerbosityParser, &DebugParser]
349 }
350
351 pub fn add_debug_streams(mut self, stream: &[StreamReference]) -> Self {
353 self.debug_streams.extend(stream);
354 self
355 }
356
357 pub fn verbosity(&self, sr: StreamReference) -> StreamVerbosity {
359 *self.stream_verbosity.get(&sr).unwrap()
360 }
361
362 pub fn debug(&self, sr: StreamReference) -> bool {
364 self.debug_streams.contains(&sr)
365 }
366}