Struct rtlola_frontend::mir::TimeDrivenStream
source · pub struct TimeDrivenStream {
pub reference: StreamReference,
pub frequency: UOM_Frequency,
}Expand description
Wrapper for output streams providing additional information specific to time-driven streams.
Fields§
§reference: StreamReferenceA reference to the stream that is specified.
frequency: UOM_FrequencyThe evaluation frequency of the stream.
Implementations§
source§impl TimeDrivenStream
impl TimeDrivenStream
sourcepub fn period(&self) -> UOM_Time
pub fn period(&self) -> UOM_Time
Returns the evaluation period, i.e., the multiplicative inverse of TimeDrivenStream::frequency.
Examples found in repository?
More examples
src/mir/schedule.rs (line 74)
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
pub(crate) fn from(ir: &RtLolaMir) -> Result<Schedule, String> {
let stream_periods = ir
.time_driven
.iter()
.filter_map(|tds| ir.output(tds.reference).is_spawned().not().then(|| tds.period()));
let spawn_periods = ir.outputs.iter().filter_map(|o| {
if let PacingType::Periodic(freq) = &o.spawn.pacing {
Some(UOM_Time::new::<second>(freq.get::<uom::si::frequency::hertz>().inv()))
} else {
None
}
});
let close_periods = ir.outputs.iter().filter_map(|o| {
if let PacingType::Periodic(freq) = &o.close.pacing {
o.close
.has_self_reference
.not()
.then(|| UOM_Time::new::<second>(freq.get::<uom::si::frequency::hertz>().inv()))
} else {
None
}
});
let periods: Vec<UOM_Time> = stream_periods.chain(spawn_periods).chain(close_periods).collect();
if periods.is_empty() {
// Nothing to schedule here
return Ok(Schedule {
hyper_period: None,
deadlines: vec![],
});
}
let gcd = Self::find_extend_period(&periods);
let hyper_period = Self::find_hyper_period(&periods);
let extend_steps = Self::build_extend_steps(ir, gcd, hyper_period)?;
let extend_steps = Self::apply_periodicity(&extend_steps);
let mut deadlines = Self::condense_deadlines(gcd, extend_steps);
Self::sort_deadlines(ir, &mut deadlines);
let hyper_period = Duration::from_nanos(hyper_period.get::<nanosecond>().to_integer().to_u64().unwrap());
Ok(Schedule {
hyper_period: Some(hyper_period),
deadlines,
})
}
/// Determines the maximal amount of time the process can wait between successive checks for
/// due deadlines without missing one.
fn find_extend_period(rates: &[UOM_Time]) -> UOM_Time {
assert!(!rates.is_empty());
let rates: Vec<Rational> = rates.iter().map(|r| r.get::<nanosecond>()).collect();
let gcd = math::rational_gcd_all(&rates);
UOM_Time::new::<nanosecond>(gcd)
}
/// Determines the hyper period of the given `rates`.
fn find_hyper_period(rates: &[UOM_Time]) -> UOM_Time {
assert!(!rates.is_empty());
let rates: Vec<Rational> = rates.iter().map(|r| r.get::<nanosecond>()).collect();
let lcm = math::rational_lcm_all(&rates);
let lcm = math::rational_lcm(lcm, Rational::one()); // needs to be multiple of 1 ns
UOM_Time::new::<nanosecond>(lcm)
}
/// Takes a vec of gcd-sized intervals. In each interval, there are streams that need
/// to be scheduled periodically at this point in time.
/// Example:
/// Hyper-period: 2 seconds, gcd: 500ms, streams: (c @ .5Hz), (b @ 1Hz), (a @ 2Hz)
/// Input: `[[a] [b] [] [c]]`
/// Output: `[[a] [a,b] [a] [a,b,c]]`
fn apply_periodicity(steps: &[Vec<Task>]) -> Vec<Vec<Task>> {
// Whenever there are streams in a cell at index `i`,
// add them to every cell with index k*i within bounds, where k > 1.
// k = 0 would always schedule them initially, so this must be skipped.
// TODO: Skip last half of the array.
let mut res = vec![Vec::new(); steps.len()];
for (ix, streams) in steps.iter().enumerate() {
if !streams.is_empty() {
let mut k = 1;
while let Some(target) = res.get_mut(k * (ix + 1) - 1) {
target.extend(streams);
k += 1;
}
}
}
res
}
/// Build extend steps for each gcd-sized time interval up to the hyper period.
/// Example:
/// Hyper-period: 2 seconds, gcd: 500ms, streams: (c @ .5Hz), (b @ 1Hz), (a @ 2Hz)
/// Result: `[[a] [b] [] [c]]`
/// Meaning: `a` starts being scheduled after one gcd, `b` after two gcds, `c` after 4 gcds.
fn build_extend_steps(ir: &RtLolaMir, gcd: UOM_Time, hyper_period: UOM_Time) -> Result<Vec<Vec<Task>>, String> {
let num_steps = hyper_period.get::<second>() / gcd.get::<second>();
assert!(num_steps.is_integer());
let num_steps = num_steps.to_integer() as usize;
if num_steps >= 10_000_000 {
return Err("stream frequencies are too incompatible to generate schedule".to_string());
}
let mut extend_steps = vec![Vec::new(); num_steps];
for s in ir
.time_driven
.iter()
.filter(|tds| !ir.output(tds.reference).is_spawned())
{
let ix = s.period().get::<second>() / gcd.get::<second>();
// Period must be integer multiple of gcd by def of gcd
assert!(ix.is_integer());
let ix = ix.to_integer() as usize;
let ix = ix - 1;
extend_steps[ix].push(Task::Evaluate(s.reference.out_ix()));
}
let periodic_spawns = ir.outputs.iter().filter_map(|o| {
match &o.spawn.pacing {
PacingType::Periodic(freq) => {
Some((
o.reference.out_ix(),
UOM_Time::new::<second>(freq.get::<uom::si::frequency::hertz>().inv()),
))
},
_ => None,
}
});
for (out_ix, period) in periodic_spawns {
let ix = period.get::<second>() / gcd.get::<second>();
// Period must be integer multiple of gcd by def of gcd
assert!(ix.is_integer());
let ix = ix.to_integer() as usize;
let ix = ix - 1;
extend_steps[ix].push(Task::Spawn(out_ix));
}
let periodic_close = ir.outputs.iter().filter_map(|o| {
if let PacingType::Periodic(freq) = &o.close.pacing {
o.close.has_self_reference.not().then(|| {
(
o.reference.out_ix(),
UOM_Time::new::<second>(freq.get::<uom::si::frequency::hertz>().inv()),
)
})
} else {
None
}
});
for (out_ix, period) in periodic_close {
let ix = period.get::<second>() / gcd.get::<second>();
// Period must be integer multiple of gcd by def of gcd
assert!(ix.is_integer());
let ix = ix.to_integer() as usize;
let ix = ix - 1;
extend_steps[ix].push(Task::Close(out_ix));
}
Ok(extend_steps)
}sourcepub fn frequency(&self) -> UOM_Frequency
pub fn frequency(&self) -> UOM_Frequency
Returns the evaluation frequency.
sourcepub fn period_in_duration(&self) -> Duration
pub fn period_in_duration(&self) -> Duration
Returns the evaluation period, i.e., the multiplicative inverse of TimeDrivenStream::frequency, as Duration.
Trait Implementations§
source§impl Clone for TimeDrivenStream
impl Clone for TimeDrivenStream
source§fn clone(&self) -> TimeDrivenStream
fn clone(&self) -> TimeDrivenStream
Returns a copy of the value. Read more
1.0.0 · source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
Performs copy-assignment from
source. Read moresource§impl Debug for TimeDrivenStream
impl Debug for TimeDrivenStream
source§impl<'de> Deserialize<'de> for TimeDrivenStream
impl<'de> Deserialize<'de> for TimeDrivenStream
source§fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>where
__D: Deserializer<'de>,
fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>where
__D: Deserializer<'de>,
Deserialize this value from the given Serde deserializer. Read more
source§impl PartialEq<TimeDrivenStream> for TimeDrivenStream
impl PartialEq<TimeDrivenStream> for TimeDrivenStream
source§fn eq(&self, other: &TimeDrivenStream) -> bool
fn eq(&self, other: &TimeDrivenStream) -> bool
This method tests for
self and other values to be equal, and is used
by ==.source§impl Serialize for TimeDrivenStream
impl Serialize for TimeDrivenStream
impl Copy for TimeDrivenStream
impl Eq for TimeDrivenStream
impl StructuralEq for TimeDrivenStream
impl StructuralPartialEq for TimeDrivenStream
Auto Trait Implementations§
impl RefUnwindSafe for TimeDrivenStream
impl Send for TimeDrivenStream
impl Sync for TimeDrivenStream
impl Unpin for TimeDrivenStream
impl UnwindSafe for TimeDrivenStream
Blanket Implementations§
source§impl<Q, K> Equivalent<K> for Qwhere
Q: Eq + ?Sized,
K: Borrow<Q> + ?Sized,
impl<Q, K> Equivalent<K> for Qwhere
Q: Eq + ?Sized,
K: Borrow<Q> + ?Sized,
source§fn equivalent(&self, key: &K) -> bool
fn equivalent(&self, key: &K) -> bool
Compare self to
key and return true if they are equal.