use crate::core::{
RobustnessSemantics, SignalIdentifier, StlOperatorAndSignalIdentifier, StlOperatorTrait,
TimeInterval,
};
use crate::ring_buffer::{RingBufferTrait, Step, guarded_prune};
use std::collections::{BTreeSet, HashSet};
use std::fmt::{Debug, Display};
use std::time::Duration;
fn is_lemire_eviction_safe(
back_ts: Duration,
new_ts: Duration,
interval: &TimeInterval,
eval_buffer: &BTreeSet<Duration>,
) -> bool {
let Some(&oldest) = eval_buffer.first() else {
return true; };
if oldest >= new_ts.saturating_sub(interval.end) {
return true;
}
if oldest > back_ts.saturating_sub(interval.start) {
return true;
}
false
}
fn pop_dominated_values<C, Y>(
cache: &mut C,
sub_step: &Step<Y>,
is_max: bool,
interval: &TimeInterval,
eval_buffer: &BTreeSet<Duration>,
is_eager: bool,
) where
C: RingBufferTrait<Value = Y>,
Y: RobustnessSemantics + Debug,
{
while let Some(back) = cache.get_back() {
if !Y::prune_dominated(back.value.clone(), sub_step.value.clone(), is_max) {
break; }
if !is_eager
&& !is_lemire_eviction_safe(back.timestamp, sub_step.timestamp, interval, eval_buffer)
{
break; }
cache.pop_back();
}
}
#[derive(Clone)]
pub struct Eventually<T, C, Y, const IS_EAGER: bool, const IS_ROSI: bool> {
interval: TimeInterval,
operand: Box<dyn StlOperatorAndSignalIdentifier<T, Y>>,
cache: C,
eval_buffer: BTreeSet<Duration>,
max_lookahead: Duration,
}
impl<T, C, Y, const IS_EAGER: bool, const IS_ROSI: bool> Eventually<T, C, Y, IS_EAGER, IS_ROSI> {
pub fn new(
interval: TimeInterval,
operand: Box<dyn StlOperatorAndSignalIdentifier<T, Y>>,
cache: Option<C>,
eval_buffer: Option<BTreeSet<Duration>>,
) -> Self
where
T: Clone + 'static,
C: RingBufferTrait<Value = Y> + Clone + 'static,
Y: RobustnessSemantics + 'static,
{
let max_lookahead = interval.end + operand.get_max_lookahead();
#[cfg(feature = "track-cache-size")]
{
let mut c = cache.unwrap_or_else(|| C::new());
c.set_tracked(true); Eventually {
interval,
operand,
cache: c,
eval_buffer: eval_buffer.unwrap_or_default(),
max_lookahead,
}
}
#[cfg(not(feature = "track-cache-size"))]
{
let c = cache.unwrap_or_else(|| C::new());
Eventually {
interval,
operand,
cache: c,
eval_buffer: eval_buffer.unwrap_or_default(),
max_lookahead,
}
}
}
}
impl<T, C, Y, const IS_EAGER: bool, const IS_ROSI: bool> StlOperatorTrait<T>
for Eventually<T, C, Y, IS_EAGER, IS_ROSI>
where
T: Clone + 'static,
C: RingBufferTrait<Value = Y> + Clone + 'static,
Y: RobustnessSemantics + Debug + 'static,
{
type Output = Y;
fn get_max_lookahead(&self) -> Duration {
self.max_lookahead
}
fn update(&mut self, step: &Step<T>) -> Vec<Step<Self::Output>> {
let sub_robustness_vec = self.operand.update(step);
let mut output_robustness = Vec::new();
if IS_ROSI {
for sub_step in sub_robustness_vec {
self.eval_buffer.insert(sub_step.timestamp);
if !self.cache.update_step(sub_step.clone()) {
let is_new_step = match self.cache.get_back() {
Some(back) => sub_step.timestamp > back.timestamp,
None => true,
};
if is_new_step {
pop_dominated_values(
&mut self.cache,
&sub_step,
true,
&self.interval,
&self.eval_buffer,
false, ); self.cache.add_step(sub_step);
}
}
}
} else {
for sub_step in &sub_robustness_vec {
self.eval_buffer.insert(sub_step.timestamp);
pop_dominated_values(
&mut self.cache,
sub_step,
true,
&self.interval,
&self.eval_buffer,
IS_EAGER, ); self.cache.add_step(sub_step.clone());
}
}
let mut tasks_to_remove = Vec::new();
let current_time = step.timestamp;
for &t_eval in self.eval_buffer.iter() {
let window_start = t_eval + self.interval.start;
let window_end = t_eval + self.interval.end;
let windowed_max_value = self
.cache
.iter()
.skip_while(|entry| entry.timestamp < window_start)
.take_while(|entry| entry.timestamp <= window_end)
.map(|entry| entry.value.clone())
.fold(Y::eventually_identity(), Y::or);
let final_value: Option<Y>;
let mut remove_task = false;
let t = if IS_ROSI {
self.cache
.get_back()
.map(|s| s.timestamp)
.unwrap_or(Duration::ZERO)
} else {
current_time
};
if t >= t_eval + self.max_lookahead {
final_value = Some(windowed_max_value);
remove_task = true;
} else if IS_EAGER && windowed_max_value == Y::atomic_true() {
final_value = Some(windowed_max_value);
remove_task = true;
} else if IS_ROSI {
let intermediate_value = Y::or(windowed_max_value, Y::unknown()); final_value = Some(intermediate_value);
} else {
break;
}
if let Some(val) = final_value {
output_robustness.push(Step::new("output", val, t_eval));
}
if remove_task {
tasks_to_remove.push(t_eval);
}
}
let protected_ts = self.eval_buffer.first().copied().unwrap_or(Duration::ZERO);
let lookahead = self.get_max_lookahead();
guarded_prune(&mut self.cache, lookahead, protected_ts);
for t in tasks_to_remove {
self.eval_buffer.remove(&t);
}
output_robustness
}
}
impl<T, C, Y, const IS_EAGER: bool, const IS_ROSI: bool> SignalIdentifier
for Eventually<T, C, Y, IS_EAGER, IS_ROSI>
{
fn get_signal_identifiers(&mut self) -> HashSet<&'static str> {
self.operand.get_signal_identifiers()
}
}
#[derive(Clone)]
pub struct Globally<T, C, Y, const IS_EAGER: bool, const IS_ROSI: bool> {
interval: TimeInterval,
operand: Box<dyn StlOperatorAndSignalIdentifier<T, Y> + 'static>,
cache: C,
eval_buffer: BTreeSet<Duration>,
max_lookahead: Duration,
}
impl<T, C, Y, const IS_EAGER: bool, const IS_ROSI: bool> Globally<T, C, Y, IS_EAGER, IS_ROSI> {
pub fn new(
interval: TimeInterval,
operand: Box<dyn StlOperatorAndSignalIdentifier<T, Y>>,
cache: Option<C>,
eval_buffer: Option<BTreeSet<Duration>>,
) -> Self
where
T: Clone + 'static,
C: RingBufferTrait<Value = Y> + Clone + 'static,
Y: RobustnessSemantics + 'static,
{
let max_lookahead = interval.end + operand.get_max_lookahead();
#[cfg(feature = "track-cache-size")]
{
let mut c = cache.unwrap_or_else(|| C::new());
c.set_tracked(true); Globally {
interval,
operand,
cache: c,
eval_buffer: eval_buffer.unwrap_or_default(),
max_lookahead,
}
}
#[cfg(not(feature = "track-cache-size"))]
{
let c = cache.unwrap_or_else(|| C::new());
Globally {
interval,
operand,
cache: c,
eval_buffer: eval_buffer.unwrap_or_default(),
max_lookahead,
}
}
}
}
impl<T, C, Y, const IS_EAGER: bool, const IS_ROSI: bool> StlOperatorTrait<T>
for Globally<T, C, Y, IS_EAGER, IS_ROSI>
where
T: Clone + 'static,
C: RingBufferTrait<Value = Y> + Clone + 'static,
Y: RobustnessSemantics + Debug + 'static,
{
type Output = Y;
fn get_max_lookahead(&self) -> Duration {
self.max_lookahead
}
fn update(&mut self, step: &Step<T>) -> Vec<Step<Self::Output>> {
let sub_robustness_vec = self.operand.update(step);
let mut output_robustness = Vec::new();
if IS_ROSI {
for sub_step in sub_robustness_vec {
self.eval_buffer.insert(sub_step.timestamp);
if !self.cache.update_step(sub_step.clone()) {
let is_new_step = match self.cache.get_back() {
Some(back) => sub_step.timestamp > back.timestamp,
None => true,
};
if is_new_step {
pop_dominated_values(
&mut self.cache,
&sub_step,
false,
&self.interval,
&self.eval_buffer,
false, ); self.cache.add_step(sub_step);
}
}
}
} else {
for sub_step in &sub_robustness_vec {
pop_dominated_values(
&mut self.cache,
sub_step,
false,
&self.interval,
&self.eval_buffer,
IS_EAGER, ); self.eval_buffer.insert(sub_step.timestamp);
self.cache.add_step(sub_step.clone());
}
}
let mut tasks_to_remove = Vec::new();
let current_time = step.timestamp;
for &t_eval in self.eval_buffer.iter() {
let window_start = t_eval + self.interval.start;
let window_end = t_eval + self.interval.end;
let windowed_min_value = self
.cache
.iter()
.skip_while(|entry| entry.timestamp < window_start)
.take_while(|entry| entry.timestamp <= window_end)
.map(|entry| entry.value.clone())
.fold(Y::globally_identity(), Y::and);
let final_value: Option<Y>;
let mut remove_task = false;
let t = if IS_ROSI {
self.cache
.get_back()
.map(|s| s.timestamp)
.unwrap_or(Duration::ZERO)
} else {
current_time
};
if t >= t_eval + self.max_lookahead {
final_value = Some(windowed_min_value);
remove_task = true;
} else if IS_EAGER && windowed_min_value == Y::atomic_false() {
final_value = Some(windowed_min_value);
remove_task = true;
} else if IS_ROSI {
let intermediate_value = Y::and(windowed_min_value, Y::unknown());
final_value = Some(intermediate_value);
} else {
break;
}
if let Some(val) = final_value {
output_robustness.push(Step::new("output", val, t_eval));
}
if remove_task {
tasks_to_remove.push(t_eval);
}
}
let protected_ts = self.eval_buffer.first().copied().unwrap_or(Duration::ZERO);
let lookahead = self.get_max_lookahead();
guarded_prune(&mut self.cache, lookahead, protected_ts);
for t in tasks_to_remove {
self.eval_buffer.remove(&t);
}
output_robustness
}
}
impl<T, C, Y, const IS_EAGER: bool, const IS_ROSI: bool> SignalIdentifier
for Globally<T, C, Y, IS_EAGER, IS_ROSI>
{
fn get_signal_identifiers(&mut self) -> HashSet<&'static str> {
self.operand.get_signal_identifiers()
}
}
impl<T, C, Y, const IS_EAGER: bool, const IS_ROSI: bool> Display
for Globally<T, Y, C, IS_EAGER, IS_ROSI>
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"G[{}, {}]({})",
self.interval.start.as_secs_f64(),
self.interval.end.as_secs_f64(),
self.operand
)
}
}
impl<T, C, Y, const IS_EAGER: bool, const IS_ROSI: bool> Display
for Eventually<T, C, Y, IS_EAGER, IS_ROSI>
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"F[{}, {}]({})",
self.interval.start.as_secs_f64(),
self.interval.end.as_secs_f64(),
self.operand
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::{StlOperatorTrait, TimeInterval};
use crate::operators::atomic_operators::Atomic;
use crate::ring_buffer::RingBuffer;
use crate::step;
use pretty_assertions::assert_eq;
use std::time::Duration;
#[test]
fn eventually_operator_robustness() {
let interval = TimeInterval {
start: Duration::from_secs(0),
end: Duration::from_secs(4),
};
let atomic = Atomic::<f64>::new_greater_than("x", 10.0);
let mut eventually = Eventually::<f64, RingBuffer<f64>, f64, false, false>::new(
interval,
Box::new(atomic),
None,
None,
);
eventually.get_signal_identifiers();
let signal_values = vec![15.0, 12.0, 8.0, 5.0, 12.0];
let signal_timestamps = vec![0, 2, 4, 6, 8];
let signal: Vec<_> = signal_values
.into_iter()
.zip(signal_timestamps)
.map(|(val, ts)| step!("x", val, Duration::from_secs(ts)))
.collect();
let mut all_outputs = Vec::new();
for s in &signal {
all_outputs.extend(eventually.update(s));
}
let expected_outputs = [
step!("output", 5.0, Duration::from_secs(0)),
step!("output", 2.0, Duration::from_secs(2)),
step!("output", 2.0, Duration::from_secs(4)),
];
assert_eq!(all_outputs.len(), expected_outputs.len());
for (output, expected) in all_outputs.iter().zip(expected_outputs.iter()) {
assert_eq!(output.timestamp, expected.timestamp);
assert!(
(output.value - expected.value).abs() < 1e-9,
"left: {}, right: {}",
output.value,
expected.value
);
}
}
#[test]
fn globally_operator_robustness() {
let interval = TimeInterval {
start: Duration::from_secs(0),
end: Duration::from_secs(4),
};
let atomic = Atomic::<f64>::new_greater_than("x", 10.0);
let mut globally = Globally::<f64, RingBuffer<f64>, f64, false, false>::new(
interval,
Box::new(atomic),
None,
None,
);
globally.get_signal_identifiers();
let signal_values = vec![15.0, 12.0, 8.0, 5.0, 12.0];
let signal_timestamps = vec![0, 2, 4, 6, 8];
let signal: Vec<_> = signal_values
.into_iter()
.zip(signal_timestamps)
.map(|(val, ts)| step!("x", val, Duration::from_secs(ts)))
.collect();
let mut all_outputs = Vec::new();
for s in &signal {
all_outputs.extend(globally.update(s));
}
let expected_outputs = [
step!("output", -2.0, Duration::from_secs(0)),
step!("output", -5.0, Duration::from_secs(2)),
step!("output", -5.0, Duration::from_secs(4)),
];
assert_eq!(all_outputs.len(), expected_outputs.len());
for (output, expected) in all_outputs.iter().zip(expected_outputs.iter()) {
assert_eq!(output.timestamp, expected.timestamp);
assert!(
(output.value - expected.value).abs() < 1e-9,
"left: {}, right: {}",
output.value,
expected.value
);
}
}
#[test]
fn unary_temporal_signal_identifiers() {
let interval = TimeInterval {
start: Duration::from_secs(0),
end: Duration::from_secs(4),
};
let atomic = Atomic::<f64>::new_greater_than("x", 10.0);
let mut globally = Globally::<f64, RingBuffer<f64>, f64, false, false>::new(
interval,
Box::new(atomic),
None,
None,
);
let ids = globally.get_signal_identifiers();
let expected_ids: HashSet<&'static str> = vec!["x"].into_iter().collect();
assert_eq!(ids, expected_ids);
}
#[test]
fn globally_display() {
let interval = TimeInterval {
start: Duration::from_secs(1),
end: Duration::from_secs(5),
};
let atomic = Atomic::<f64>::new_greater_than("x", 10.0);
let globally = Globally::<f64, RingBuffer<f64>, f64, false, false>::new(
interval,
Box::new(atomic),
None,
None,
);
assert_eq!(format!("{globally}"), "G[1, 5](x > 10)");
}
#[test]
fn eventually_display() {
let interval = TimeInterval {
start: Duration::from_secs(0),
end: Duration::from_secs(3),
};
let atomic = Atomic::<f64>::new_less_than("y", 5.0);
let eventually = Eventually::<f64, RingBuffer<f64>, f64, false, false>::new(
interval,
Box::new(atomic),
None,
None,
);
assert_eq!(format!("{eventually}"), "F[0, 3](y < 5)");
}
}
#[cfg(test)]
mod sparse_timestamp_tests {
use super::*;
use crate::core::{RobustnessInterval, StlOperatorTrait, TimeInterval};
use crate::operators::atomic_operators::Atomic;
use crate::ring_buffer::{RingBuffer, Step};
use crate::step;
use std::time::Duration;
fn secs(s: u64) -> Duration {
Duration::from_secs(s)
}
fn g02_globally_f64() -> Globally<f64, RingBuffer<f64>, f64, false, false> {
let interval = TimeInterval {
start: secs(0),
end: secs(2),
};
let atomic = Atomic::<f64>::new_greater_than("x", 3.0);
Globally::new(interval, Box::new(atomic), None, None)
}
fn g02_globally_rosi()
-> Globally<f64, RingBuffer<RobustnessInterval>, RobustnessInterval, true, true> {
let interval = TimeInterval {
start: secs(0),
end: secs(2),
};
let atomic = Atomic::<RobustnessInterval>::new_greater_than("x", 3.0);
Globally::new(interval, Box::new(atomic), None, None)
}
fn g02_globally_eager_qual() -> Globally<f64, RingBuffer<bool>, bool, true, false> {
let interval = TimeInterval {
start: secs(0),
end: secs(2),
};
let atomic = Atomic::<bool>::new_greater_than("x", 3.0);
Globally::new(interval, Box::new(atomic), None, None)
}
fn sparse_steps() -> Vec<Step<f64>> {
vec![
step!("x", 100.0, secs(0)),
step!("x", 15.0, secs(1)),
step!("x", 16.0, secs(2)),
step!("x", 2.0, secs(5)),
step!("x", 2.0, secs(10)),
]
}
fn find_output<Y>(outputs: &[Step<Y>], ts: u64) -> Y
where
Y: Copy,
{
outputs
.iter()
.find(|s| s.timestamp == secs(ts))
.unwrap_or_else(|| panic!("no output for t_eval={ts}"))
.value
}
#[test]
fn globally_sparse_timestamps() {
let mut op_f64 = g02_globally_f64();
let mut op_rosi = g02_globally_rosi();
let mut op_eager_qual = g02_globally_eager_qual();
for step in &sparse_steps() {
let outputs_f64 = op_f64.update(step);
let outputs_rosi = op_rosi.update(step);
let outputs_eager_qual = op_eager_qual.update(step);
match step.timestamp.as_secs() {
0 => {
assert!(
outputs_f64.is_empty() && outputs_eager_qual.is_empty(),
"t_eval={} expected no output, got {:?}",
step.timestamp.as_secs(),
outputs_f64
);
assert!(
outputs_rosi.len() == 1,
"t_eval={} expected no output, got {:?}",
step.timestamp.as_secs(),
outputs_rosi
);
}
1 => {
assert!(
outputs_f64.is_empty() && outputs_eager_qual.is_empty(),
"t_eval={} expected no output, got {:?}",
step.timestamp.as_secs(),
outputs_f64
);
assert!(
outputs_rosi.len() == 2,
"t_eval={} expected no output, got {:?}",
step.timestamp.as_secs(),
outputs_rosi
);
}
2 => {
assert!(
(find_output(&outputs_f64, 0) - 12.0).abs() < 1e-9,
"t_eval=0 expected 12.0, got {}",
find_output(&outputs_f64, 0)
);
let rosi_val = find_output(&outputs_rosi, 0);
assert!(
rosi_val.0 == 12.0 && rosi_val.1 == 12.0,
"t_eval=0 expected ROSI bounds to contain 12.0, got {:?}",
rosi_val
);
assert!(
find_output(&outputs_eager_qual, 0),
"t_eval=0 expected eager qual to be true, got {}",
find_output(&outputs_eager_qual, 0)
)
}
5 => {
assert!(
(find_output(&outputs_f64, 1) - 12.0).abs() < 1e-9,
"t_eval=1 expected 12.0, got {}",
find_output(&outputs_f64, 1)
);
assert!(
(find_output(&outputs_f64, 2) - 13.0).abs() < 1e-9,
"t_eval=2 expected 13.0, got {}",
find_output(&outputs_f64, 2)
);
let rosi_val_1 = find_output(&outputs_rosi, 1);
let rosi_val_2 = find_output(&outputs_rosi, 2);
let rosi_val_5 = find_output(&outputs_rosi, 5);
assert!(
rosi_val_1.0 == 12.0 && rosi_val_1.1 == 12.0,
"t_eval=1 expected ROSI bounds to contain 12.0, got {:?}",
rosi_val_1
);
assert!(
rosi_val_2.0 == 13.0 && rosi_val_2.1 == 13.0,
"t_eval=2 expected ROSI bounds to contain 13.0, got {:?}",
rosi_val_2
);
assert!(
find_output(&outputs_eager_qual, 1),
"t_eval=1 expected eager qual to be true, got {}",
find_output(&outputs_eager_qual, 1)
);
assert!(
find_output(&outputs_eager_qual, 2),
"t_eval=2 expected eager qual to be true, got {}",
find_output(&outputs_eager_qual, 2)
);
assert!(
!find_output(&outputs_eager_qual, 5),
"t_eval=5 expected eager qual to be false, got {}",
find_output(&outputs_eager_qual, 5)
);
assert!(
rosi_val_5.1 < 0.0,
"t_eval=5 expected ROSI upper bound to be negative, got {:?}",
rosi_val_5
);
}
10 => {
assert!(
(find_output(&outputs_f64, 5) + 1.0).abs() < 1e-9,
"t_eval=5 expected -1.0, got {}",
find_output(&outputs_f64, 5)
);
let rosi_val = find_output(&outputs_rosi, 5);
assert!(
rosi_val.0 == -1.0 && rosi_val.1 == -1.0,
"t_eval=5 expected ROSI bounds to contain -1.0, got {:?}",
rosi_val
);
assert!(
!find_output(&outputs_eager_qual, 10),
"t_eval=5 expected eager qual to be false, got {}",
find_output(&outputs_eager_qual, 10)
);
}
_ => panic!("unexpected output at t={}", step.timestamp.as_secs()),
}
}
}
}