Struct rtlola_interpreter::Monitor
source · pub struct Monitor<Source, SourceTime, Verdict = Incremental, VerdictTime = RelativeFloat>where
Source: Input,
SourceTime: TimeRepresentation,
Verdict: VerdictRepresentation,
VerdictTime: OutputTimeRepresentation + 'static,{ /* private fields */ }
Expand description
The Monitor is the central object exposed by the API.
The Monitor accepts new events and computes streams.
It can compute event-based streams based on new events through accept_event
.
It can also simply advance periodic streams up to a given timestamp through accept_time
.
The generic argument Source
implements the Input trait describing the input source of the API.
The generic argument SourceTime
implements the TimeRepresentation trait defining the input time format.
The generic argument Verdict
implements the VerdictRepresentation trait describing the output format of the API that is by default Incremental.
The generic argument VerdictTime
implements the TimeRepresentation trait defining the output time format. It defaults to RelativeFloat
Implementations§
source§impl<Source, SourceTime, Verdict, VerdictTime> Monitor<Source, SourceTime, Verdict, VerdictTime>where
Source: Input,
SourceTime: TimeRepresentation,
Verdict: VerdictRepresentation,
VerdictTime: OutputTimeRepresentation,
impl<Source, SourceTime, Verdict, VerdictTime> Monitor<Source, SourceTime, Verdict, VerdictTime>where
Source: Input,
SourceTime: TimeRepresentation,
Verdict: VerdictRepresentation,
VerdictTime: OutputTimeRepresentation,
Crate-public interface
sourcepub fn setup(
config: Config<SourceTime, VerdictTime>,
setup_data: Source::CreationData
) -> Result<Monitor<Source, SourceTime, Verdict, VerdictTime>, Source::Error>
pub fn setup(
config: Config<SourceTime, VerdictTime>,
setup_data: Source::CreationData
) -> Result<Monitor<Source, SourceTime, Verdict, VerdictTime>, Source::Error>
setup
Examples found in repository?
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
pub fn monitor_with_data(
self,
data: Source::CreationData,
) -> Result<Monitor<Source, SourceTime, Verdict, VerdictTime>, Source::Error> {
Monitor::setup(self.config, data)
}
/// Transforms the configuration into a [Monitor]
pub fn monitor(self) -> Result<Monitor<Source, SourceTime, Verdict, VerdictTime>, Source::Error>
where
Source: Input<CreationData = ()>,
{
Monitor::setup(self.config, ())
}
#[cfg(feature = "queued-api")]
/// Transforms the configuration into a [QueuedMonitor] using the provided data to setup the input source.
pub fn queued_monitor_with_data(
self,
data: Source::CreationData,
) -> QueuedMonitor<Source, SourceTime, Verdict, VerdictTime> {
QueuedMonitor::setup(self.config, data)
}
#[cfg(feature = "queued-api")]
/// Transforms the configuration into a [QueuedMonitor]
pub fn queued_monitor(self) -> QueuedMonitor<Source, SourceTime, Verdict, VerdictTime>
where
Source: Input<CreationData = ()>,
{
QueuedMonitor::setup(self.config, ())
}
}
/// The execution mode of the interpreter. See the `README` for more details.
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum ExecutionMode {
/// Time provided by input source
Offline,
/// Time taken by evaluator
Online,
}
impl Config<RelativeFloat, RelativeFloat> {
/// Creates a new Debug config.
pub fn debug(ir: RtLolaMir) -> Self {
Config {
ir,
mode: ExecutionMode::Offline,
input_time_representation: RelativeFloat::default(),
output_time_representation: PhantomData::<RelativeFloat>::default(),
start_time: None,
}
}
}
impl<InputTime: TimeRepresentation, OutputTime: OutputTimeRepresentation> Config<InputTime, OutputTime> {
/// Creates a new API config
pub fn api(ir: RtLolaMir) -> Self {
Config {
ir,
mode: ExecutionMode::Offline,
input_time_representation: InputTime::default(),
output_time_representation: PhantomData::default(),
start_time: None,
}
}
/// Turn the configuration into the [Monitor] API.
pub fn monitor<Source: Input, Verdict: VerdictRepresentation>(
self,
data: Source::CreationData,
) -> Result<Monitor<Source, InputTime, Verdict, OutputTime>, Source::Error> {
Monitor::setup(self, data)
}
More examples
source§impl<Source, SourceTime, Verdict, VerdictTime> Monitor<Source, SourceTime, Verdict, VerdictTime>where
Source: Input,
SourceTime: TimeRepresentation,
Verdict: VerdictRepresentation,
VerdictTime: OutputTimeRepresentation,
impl<Source, SourceTime, Verdict, VerdictTime> Monitor<Source, SourceTime, Verdict, VerdictTime>where
Source: Input,
SourceTime: TimeRepresentation,
Verdict: VerdictRepresentation,
VerdictTime: OutputTimeRepresentation,
Public interface
sourcepub fn accept_event(
&mut self,
ev: Source::Record,
ts: SourceTime::InnerTime
) -> Result<Verdicts<Verdict, VerdictTime>, Source::Error>
pub fn accept_event(
&mut self,
ev: Source::Record,
ts: SourceTime::InnerTime
) -> Result<Verdicts<Verdict, VerdictTime>, Source::Error>
Computes all periodic streams up through the new timestamp and then handles the input event.
The new event is therefore not seen by periodic streams up through a new timestamp.
Examples found in repository?
599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648
fn process(&mut self) -> Result<(), QueueError> {
let monitor = self.monitor.as_mut().expect("Init to be called before process");
let mut last_event = None;
let mut done = false;
while !done {
let timed = match self.input.recv() {
Ok(WorkItem::Event(e, ts)) => {
// Received Event
last_event.replace(ts.clone());
let Verdicts { timed, event, ts } = monitor
.accept_event(e, ts)
.map_err(|e| QueueError::SourceError(Box::new(e)))?;
if !event.is_empty() {
let verdict = QueuedVerdict {
kind: VerdictKind::Event,
ts: ts.clone(),
verdict: event,
};
Self::try_send(&self.output, Some(verdict))?;
}
timed
},
Err(_) => {
// Channel closed, we are done here
done = true;
if let Some(last_event) = last_event.as_ref() {
monitor.accept_time(last_event.clone())
} else {
return Ok(());
}
},
Ok(WorkItem::Start) => {
// Received second start command -> abort
return Err(QueueError::MultipleStart);
},
};
for (ts, v) in timed {
let verdict = QueuedVerdict {
kind: VerdictKind::Timed,
ts,
verdict: v,
};
Self::try_send(&self.output, Some(verdict))?;
}
}
Ok(())
}
sourcepub fn accept_time(
&mut self,
ts: SourceTime::InnerTime
) -> Vec<(VerdictTime::InnerTime, Verdict)> ⓘ
pub fn accept_time(
&mut self,
ts: SourceTime::InnerTime
) -> Vec<(VerdictTime::InnerTime, Verdict)> ⓘ
Computes all periodic streams up through and including the timestamp.
Examples found in repository?
599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648
fn process(&mut self) -> Result<(), QueueError> {
let monitor = self.monitor.as_mut().expect("Init to be called before process");
let mut last_event = None;
let mut done = false;
while !done {
let timed = match self.input.recv() {
Ok(WorkItem::Event(e, ts)) => {
// Received Event
last_event.replace(ts.clone());
let Verdicts { timed, event, ts } = monitor
.accept_event(e, ts)
.map_err(|e| QueueError::SourceError(Box::new(e)))?;
if !event.is_empty() {
let verdict = QueuedVerdict {
kind: VerdictKind::Event,
ts: ts.clone(),
verdict: event,
};
Self::try_send(&self.output, Some(verdict))?;
}
timed
},
Err(_) => {
// Channel closed, we are done here
done = true;
if let Some(last_event) = last_event.as_ref() {
monitor.accept_time(last_event.clone())
} else {
return Ok(());
}
},
Ok(WorkItem::Start) => {
// Received second start command -> abort
return Err(QueueError::MultipleStart);
},
};
for (ts, v) in timed {
let verdict = QueuedVerdict {
kind: VerdictKind::Timed,
ts,
verdict: v,
};
Self::try_send(&self.output, Some(verdict))?;
}
}
Ok(())
}
sourcepub fn ir(&self) -> &RtLolaMir
pub fn ir(&self) -> &RtLolaMir
Returns the underlying representation of the specification as an RtLolaMir
sourcepub fn name_for_input(&self, id: InputReference) -> &str
pub fn name_for_input(&self, id: InputReference) -> &str
Get the name of an input stream based on its InputReference.
The reference is valid for the lifetime of the monitor.
sourcepub fn name_for_output(&self, id: OutputReference) -> &str
pub fn name_for_output(&self, id: OutputReference) -> &str
Get the name of an output stream based on its OutputReference.
The reference is valid for the lifetime of the monitor.
sourcepub fn trigger_message(&self, id: usize) -> &str
pub fn trigger_message(&self, id: usize) -> &str
Get the message of a trigger based on its index.
The reference is valid for the lifetime of the monitor.
sourcepub fn trigger_stream_index(&self, id: usize) -> usize
pub fn trigger_stream_index(&self, id: usize) -> usize
Get the OutputReference of a trigger based on its index.
sourcepub fn number_of_input_streams(&self) -> usize
pub fn number_of_input_streams(&self) -> usize
Get the number of input streams.
sourcepub fn number_of_output_streams(&self) -> usize
pub fn number_of_output_streams(&self) -> usize
Get the number of output streams (this includes one output stream for each trigger).
sourcepub fn number_of_triggers(&self) -> usize
pub fn number_of_triggers(&self) -> usize
Get the number of triggers.
sourcepub fn type_of_input(&self, id: InputReference) -> &Type
pub fn type_of_input(&self, id: InputReference) -> &Type
Get the type of an input stream based on its InputReference.
The reference is valid for the lifetime of the monitor.
sourcepub fn type_of_output(&self, id: OutputReference) -> &Type
pub fn type_of_output(&self, id: OutputReference) -> &Type
Get the type of an output stream based on its OutputReference.
The reference is valid for the lifetime of the monitor.
sourcepub fn extend_rate_of_output(&self, id: OutputReference) -> Option<Duration>
pub fn extend_rate_of_output(&self, id: OutputReference) -> Option<Duration>
Get the extend rate of an output stream based on its OutputReference.
The reference is valid for the lifetime of the monitor.
sourcepub fn with_verdict_representation<T: VerdictRepresentation>(
self
) -> Monitor<Source, SourceTime, T, VerdictTime>
pub fn with_verdict_representation<T: VerdictRepresentation>(
self
) -> Monitor<Source, SourceTime, T, VerdictTime>
Switch VerdictRepresentations of the Monitor.