pub trait VerdictRepresentation: Clone + Debug + Send + CondSerialize + 'static {
type Tracing: Tracer;
fn create(data: RawVerdict<'_>) -> Self;
fn is_empty(&self) -> bool;
fn create_with_trace(data: RawVerdict<'_>, _tracing: Self::Tracing) -> Self { ... }
}Expand description
Provides the functionality to generate a snapshot of the streams values.
Required Associated Types§
Required Methods§
sourcefn create(data: RawVerdict<'_>) -> Self
fn create(data: RawVerdict<'_>) -> Self
Creates a snapshot of the streams values.
Provided Methods§
sourcefn create_with_trace(data: RawVerdict<'_>, _tracing: Self::Tracing) -> Self
fn create_with_trace(data: RawVerdict<'_>, _tracing: Self::Tracing) -> Self
Creates a snapshot of the streams values including tracing data.
Examples found in repository?
src/api/monitor.rs (line 422)
409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633
fn eval_deadlines(&mut self, ts: Time, only_before: bool) -> Vec<(Time, Verdict)> {
let mut timed: Vec<(Time, Verdict)> = vec![];
while self.schedule_manager.get_next_due().is_some() {
let mut tracer = Verdict::Tracing::default();
tracer.eval_start();
let due = self.schedule_manager.get_next_due().unwrap();
if due > ts || (only_before && due == ts) {
break;
}
let deadline = self.schedule_manager.get_next_deadline(ts);
self.eval.eval_time_driven_tasks(deadline, due, &mut tracer);
tracer.eval_end();
timed.push((due, Verdict::create_with_trace(RawVerdict::from(&self.eval), tracer)))
}
timed
}
}
/// A raw verdict that is transformed into the respective representation
#[allow(missing_debug_implementations)]
#[derive(Copy, Clone)]
pub struct RawVerdict<'a> {
eval: &'a Evaluator,
}
impl<'a> From<&'a Evaluator> for RawVerdict<'a> {
fn from(eval: &'a Evaluator) -> Self {
RawVerdict { eval }
}
}
/// This trait provides the functionality to pass inputs to the monitor.
/// You can either implement this trait for your own Datatype or use one of the predefined input methods.
/// See [RecordInput] and [EventInput]
pub trait Input: Sized {
/// The type from which an event is generated by the input source.
type Record: Send;
/// The error type returned by the input source on IO errors or parsing issues.
type Error: Error + Send + 'static;
/// Arbitrary type of the data provided to the input source at creation time.
type CreationData: Clone + Send;
/// Creates a new input source from a HashMap mapping the names of the inputs in the specification to their position in the event.
fn new(map: HashMap<String, InputReference>, setup_data: Self::CreationData) -> Result<Self, Self::Error>;
/// This function converts a record to an event.
fn get_event(&self, rec: Self::Record) -> Result<Event, Self::Error>;
}
/// This trait provides functionality to parse a record into an event.
/// It is only used in combination with the [RecordInput].
pub trait Record: Send {
/// Arbitrary type of the data provided at creation time to help initializing the input method.
type CreationData: Clone + Send;
/// The error returned if anything goes wrong.
type Error: Error + Send + 'static;
/// Given the name of an input this function returns a function that given a record returns the value for that input.
fn func_for_input(name: &str, data: Self::CreationData) -> Result<ValueProjection<Self, Self::Error>, Self::Error>;
}
/// A function Type that projects a reference to `From` to a `Value`
pub type ValueProjection<From, E> = Box<dyn (Fn(&From) -> Result<Value, E>)>;
/// An input method for types that implement the [Record] trait. Useful if you do not want to bother with the order of the input streams in an event.
/// Assuming the specification has 3 inputs: 'a', 'b' and 'c'. You could implement this trait for your custom 'MyType' as follows:
/// ```
/// use std::fmt::Formatter;
///
/// use rtlola_interpreter::monitor::Record;
/// use rtlola_interpreter::Value;
/// #[cfg(feature = "serde")]
/// use serde::{Deserialize, Serialize};
///
/// #[derive(Debug, Clone)]
/// struct MyError(String);
/// impl std::fmt::Display for MyError {
/// fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
/// write!(f, "An error occurred: {}", self.0)
/// }
/// }
/// impl std::error::Error for MyError {}
///
/// #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
/// struct MyType {
/// a: u64,
/// b: Option<bool>,
/// c: String,
/// }
///
/// impl MyType {
/// // Generate a new value for input stream 'a'
/// fn a(rec: &Self) -> Result<Value, MyError> {
/// Ok(Value::from(rec.a))
/// }
///
/// // Generate a new value for input stream 'b'
/// fn b(rec: &Self) -> Result<Value, MyError> {
/// Ok(rec.b.map(|b| Value::from(b)).unwrap_or(Value::None))
/// }
///
/// // Generate a new value for input stream 'c'
/// fn c(rec: &Self) -> Result<Value, MyError> {
/// Ok(Value::Str(rec.c.clone().into_boxed_str()))
/// }
/// }
///
/// impl Record for MyType {
/// type CreationData = ();
/// type Error = MyError;
///
/// fn func_for_input(
/// name: &str,
/// _data: Self::CreationData,
/// ) -> Result<Box<dyn (Fn(&MyType) -> Result<Value, MyError>)>, MyError> {
/// match name {
/// "a" => Ok(Box::new(Self::a)),
/// "b" => Ok(Box::new(Self::b)),
/// "c" => Ok(Box::new(Self::c)),
/// x => {
/// Err(MyError(format!(
/// "Unexpected input stream {} in specification.",
/// x
/// )))
/// },
/// }
/// }
/// }
/// ```
#[allow(missing_debug_implementations)]
pub struct RecordInput<Inner: Record> {
translators: Vec<ValueProjection<Inner, Inner::Error>>,
}
impl<Inner: Record> Input for RecordInput<Inner> {
type CreationData = Inner::CreationData;
type Error = Inner::Error;
type Record = Inner;
fn new(map: HashMap<String, InputReference>, setup_data: Self::CreationData) -> Result<Self, Self::Error> {
let mut translators: Vec<Option<_>> = (0..map.len()).map(|_| None).collect();
for (input_name, index) in map {
translators[index] = Some(Inner::func_for_input(input_name.as_str(), setup_data.clone())?)
}
let translators = translators.into_iter().map(Option::unwrap).collect();
Ok(Self { translators })
}
fn get_event(&self, rec: Inner) -> Result<Event, Self::Error> {
self.translators.iter().map(|f| f(&rec)).collect()
}
}
/// The simplest input method to the monitor. It accepts any type that implements `Into<Event>`.
/// The conversion to values and the order of inputs must be handled externally.
#[derive(Debug, Clone)]
pub struct EventInput<E: Into<Event> + CondSerialize + CondDeserialize> {
phantom: PhantomData<E>,
}
impl<E: Into<Event> + Send + CondSerialize + CondDeserialize> Input for EventInput<E> {
type CreationData = ();
type Error = NoError;
type Record = E;
fn new(_map: HashMap<String, InputReference>, _setup_data: Self::CreationData) -> Result<Self, Self::Error> {
Ok(EventInput { phantom: PhantomData })
}
fn get_event(&self, rec: Self::Record) -> Result<Event, Self::Error> {
Ok(rec.into())
}
}
/// Public interface
impl<Source, SourceTime, Verdict, VerdictTime> Monitor<Source, SourceTime, Verdict, VerdictTime>
where
Source: Input,
SourceTime: TimeRepresentation,
Verdict: VerdictRepresentation,
VerdictTime: OutputTimeRepresentation,
{
/**
Computes all periodic streams up through the new timestamp and then handles the input event.
The new event is therefore not seen by periodic streams up through a new timestamp.
*/
pub fn accept_event(
&mut self,
ev: Source::Record,
ts: SourceTime::InnerTime,
) -> Result<Verdicts<Verdict, VerdictTime>, Source::Error> {
let mut tracer = Verdict::Tracing::default();
tracer.parse_start();
let ev = self.source.get_event(ev)?;
tracer.parse_end();
let ts = self.source_time.convert_from(ts);
// Evaluate timed streams with due < ts
let timed = if self.ir.has_time_driven_features() {
self.eval_deadlines(ts, true)
} else {
vec![]
};
// Evaluate
tracer.eval_start();
self.eval.eval_event(ev.as_slice(), ts, &mut tracer);
tracer.eval_end();
let event_change = Verdict::create_with_trace(RawVerdict::from(&self.eval), tracer);
let timed = timed
.into_iter()
.map(|(t, v)| (self.output_time.convert_into(t), v))
.collect();
Ok(Verdicts::<Verdict, VerdictTime> {
timed,
event: event_change,
ts: self.output_time.convert_into(ts),
})
}More examples
src/api/queued.rs (line 513)
490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549
fn process(&mut self) -> Result<(), QueueError> {
let output_time = self.output_time.as_mut().expect("Init to be executed before process");
loop {
let next_deadline = self.schedule_manager.get_next_due();
let item = if let Some(due) = next_deadline {
self.input.recv_timeout(due)
} else {
self.input.recv().map_err(|_| RecvTimeoutError::Disconnected)
};
let verdict = match item {
Ok(WorkItem::Event(e, ts)) => {
// Received Event before deadline
let e = self
.source
.get_event(e)
.map_err(|e| QueueError::SourceError(Box::new(e)))?;
let ts = self.source_time.convert_from(ts);
let mut tracer = Verdict::Tracing::default();
tracer.eval_start();
self.evaluator.eval_event(&e, ts, &mut tracer);
tracer.eval_end();
let verdict = Verdict::create_with_trace(RawVerdict::from(&self.evaluator), tracer);
verdict.is_empty().not().then_some(QueuedVerdict {
kind: VerdictKind::Event,
ts: output_time.convert_into(ts),
verdict,
})
},
Err(RecvTimeoutError::Timeout) => {
// Deadline occurred before event
let mut tracer = Verdict::Tracing::default();
tracer.eval_start();
let due = next_deadline.expect("timeout to only happen for a deadline.");
let deadline = self.schedule_manager.get_next_deadline(due);
self.evaluator.eval_time_driven_tasks(deadline, due, &mut tracer);
tracer.eval_end();
let verdict = Verdict::create_with_trace(RawVerdict::from(&self.evaluator), tracer);
verdict.is_empty().not().then_some(QueuedVerdict {
kind: VerdictKind::Timed,
ts: output_time.convert_into(due),
verdict,
})
},
Err(RecvTimeoutError::Disconnected) => {
// Channel closed, we are done here
return Ok(());
},
Ok(WorkItem::Start) => {
// Received second start command -> abort
return Err(QueueError::MultipleStart);
},
};
Self::try_send(&self.output, verdict)?;
}
}