response_time_analysis/arrival/curve.rs
1use std::cell::RefCell;
2use std::collections::VecDeque;
3use std::iter::{self, FromIterator};
4use std::rc::Rc;
5
6use super::{
7 divide_with_ceil, nonzero_delta_min_iter, ArrivalBound, Periodic, Propagated, Sporadic,
8};
9use crate::time::{Duration, Offset};
10
11/// An arrival curve (also commonly called an "upper event arrival
12/// curve" *η+*) that can describe arbitrarily bursty sporadic
13/// arrival processes.
14///
15/// As is common, the arrival curve is defined by a finite *delta-min
16/// vector* that describes the minimum interval length in which a
17/// certain number of jobs may arrive.
18#[derive(Clone, Debug)]
19pub struct Curve {
20 min_distance: Vec<Duration>,
21}
22
23impl Curve {
24 /// Construct a new arrival curve.
25 ///
26 /// The `delta_min_prefix` is the *delta-min prefix* describing
27 /// the arrival curve. Each element of the vector gives the
28 /// minimum interval length in which a corresponding number of
29 /// jobs may arrive.
30
31 /// **Convention**: we do not store the mininum distance for 0
32 /// and 1 jobs. So the `min_distance` vector at offset 0 contains
33 /// the minimum separation of two jobs, the `min_distance` vector
34 /// at offset 1 contains the length of the shortest interval in
35 /// which three jobs arrive, and so on.
36 pub fn new(delta_min_prefix: Vec<Duration>) -> Curve {
37 assert!(!delta_min_prefix.is_empty());
38 Curve {
39 min_distance: delta_min_prefix,
40 }
41 }
42
43 /// Obtain an arrival curve by inferring a delta-min vector from
44 /// any given arrival process `T`.
45 ///
46 /// The delta-min vector is chosen such that it covers all
47 /// arrivals until the given `horizon`.
48 pub fn from_arrival_bound_until<T: ArrivalBound>(ab: &T, horizon: Duration) -> Curve {
49 Self::new(
50 nonzero_delta_min_iter(&ab)
51 .enumerate()
52 .take_while(|(count, (_njobs, delta))| *delta <= horizon || *count < 2)
53 .map(|(_count, (_njobs, delta))| delta)
54 .collect(),
55 )
56 }
57
58 /// Obtain an arrival curve by inferring a delta-min vector from
59 /// any given arrival process `T`.
60 ///
61 /// The delta-min vector is chosen such that it covers at least
62 /// `up_to_njobs` job arrivals.
63 pub fn from_arrival_bound<T: ArrivalBound>(ab: &T, up_to_njobs: usize) -> Curve {
64 Self::new(
65 nonzero_delta_min_iter(&ab)
66 .enumerate()
67 .take_while(|(count, (njobs, _delta))| *njobs <= up_to_njobs || *count < 2)
68 .map(|(_count, (_njobs, delta))| delta)
69 .collect(),
70 )
71 }
72
73 /// Obtain an arrival curve by inferring a delta-min prefix from
74 /// a given trace of arrival events, expressed as the offset of
75 /// the event from a reference time zero.
76 ///
77 /// The resultant delta-min vector will consist of `prefix_jobs`
78 /// entries (if there are a sufficient number of arrivals in the
79 /// trace).
80 pub fn from_trace(arrival_times: impl Iterator<Item = Offset>, prefix_jobs: usize) -> Curve {
81 let mut d: Vec<Duration> = Vec::with_capacity(prefix_jobs);
82 let mut window: VecDeque<Offset> = VecDeque::with_capacity(prefix_jobs + 1);
83
84 // consider all job arrivals in the trace
85 for t in arrival_times {
86 // sanity check: the arrival times must be monotonic
87 assert!(t >= *(window.back().unwrap_or(&t)));
88 // look at all arrival times in the sliding window, in order
89 // from most recent to oldest
90 for (i, v) in window.iter().rev().enumerate() {
91 // Compute the separation from the current arrival t to the arrival
92 // of the (i + 1)-th preceding job.
93 // So if i=0, we are looking at two adjacent jobs.
94 let observed_gap = v.distance_to(t);
95 if d.len() <= i {
96 // we have not yet seen (i + 2) jobs in a row -> first sample
97 d.push(observed_gap)
98 } else {
99 // update belief if we have seen two events with
100 // less separation than previously observed
101 d[i] = d[i].min(observed_gap)
102 }
103 }
104 // add arrival time to sliding window
105 window.push_back(t);
106 // trim sliding window if necessary
107 if window.len() > prefix_jobs {
108 window.pop_front();
109 }
110 }
111
112 // FIXME: d must not be empty
113 Curve::new(d)
114 }
115
116 fn extrapolate_next(&self) -> Duration {
117 let n = self.min_distance.len();
118 assert!(n >= 2);
119 // we are using n - k - 1 here because we don't store n=0 and n=1, so the
120 // index is offset by 2
121 (0..=(n / 2))
122 .map(|k| self.min_distance[k] + self.min_distance[n - k - 1])
123 .max()
124 .unwrap()
125 }
126
127 fn can_extrapolate(&self) -> bool {
128 // we cannot meaningfully extrapolate degenerate cases, so let's skip those
129 self.min_distance.len() >= 2
130 }
131
132 /// Extend the underlying delta-min prefix by exploiting the
133 /// [subadditivity](https://en.wikipedia.org/wiki/Subadditivity)
134 /// of proper arrival curves until the delta-min prefix covers
135 /// intervals of length `horizon`.
136 pub fn extrapolate(&mut self, horizon: Duration) {
137 if self.can_extrapolate() {
138 while self.largest_known_distance() < horizon {
139 self.min_distance.push(self.extrapolate_next())
140 }
141 }
142 }
143
144 /// Extend the underlying delta-min prefix by exploiting the
145 /// [subadditivity](https://en.wikipedia.org/wiki/Subadditivity)
146 /// of proper arrival curves by `n` elements.
147 pub fn extrapolate_steps(&mut self, n: usize) {
148 if self.can_extrapolate() {
149 while self.jobs_in_largest_known_distance() < n {
150 self.min_distance.push(self.extrapolate_next())
151 }
152 }
153 }
154
155 /// Extend the underlying delta-min prefix by one element while
156 /// exploiting _both_ the
157 /// [subadditivity](https://en.wikipedia.org/wiki/Subadditivity)
158 /// of proper arrival curves _and_ the provided bound on the next
159 /// step. The bound is a tuple (`delta`, `njobs`), where `njobs`
160 /// *must* be the element that the underlying delta-min vector is
161 /// extrapolated to.
162 pub fn extrapolate_with_bound(&mut self, bound: (Duration, usize)) {
163 let (delta, njobs) = bound;
164 // We subtract epsilon since we store the distance
165 // between jobs, not the interval length.
166 let dmin = delta - Duration::epsilon();
167 // check that we've been given the expected upper bound
168 // (+ 2 because we don't store the values for 0 and 1 jobs)
169 if self.min_distance.len() + 2 == njobs {
170 if self.can_extrapolate() {
171 let extrapolated = self.extrapolate_next();
172 self.min_distance.push(dmin.max(extrapolated))
173 } else {
174 // If we cannot extrapolate, simply take the given bound.
175 self.min_distance.push(dmin)
176 }
177 }
178 }
179
180 fn min_job_separation(&self) -> Duration {
181 // minimum separation of two jobs given by first element
182 self.min_distance[0]
183 }
184
185 fn largest_known_distance(&self) -> Duration {
186 *self.min_distance.last().unwrap()
187 }
188
189 fn jobs_in_largest_known_distance(&self) -> usize {
190 self.min_distance.len()
191 }
192
193 // note: does not extrapolate
194 fn lookup_arrivals(&self, delta: Duration) -> usize {
195 // TODO: for really large vectors, this should be a binary search...
196 for (i, distance_of_njobs) in self.min_distance.iter().enumerate() {
197 let njobs = i + 2; // we do not store n=0 and n=1
198 if delta <= *distance_of_njobs {
199 return njobs - 1;
200 }
201 }
202 // should never get here
203 panic!()
204 }
205
206 /// Return a lower bound on the length of an interval in which
207 /// `n` arrival events occur.
208 ///
209 /// Note: does not extrapolate, so extremely pessimistic if `n`
210 /// exceeds the length of the internal minimum-distance prefix.
211 pub fn min_distance(&self, n: usize) -> Duration {
212 if n > 1 {
213 // account for the fact that we store distances only for 2+ jobs
214 self.min_distance[(n - 2).min(self.min_distance.len() - 1)]
215 } else {
216 Duration::zero()
217 }
218 }
219}
220
221impl FromIterator<Duration> for Curve {
222 fn from_iter<I: IntoIterator<Item = Duration>>(iter: I) -> Curve {
223 let mut distances: Vec<Duration> = iter.into_iter().collect();
224 // ensure the min-distance function is monotonic
225 for i in 1..distances.len() {
226 distances[i] = distances[i].max(distances[i - 1]);
227 }
228 assert!(!distances.is_empty());
229 Curve {
230 min_distance: distances,
231 }
232 }
233}
234
235impl ArrivalBound for Curve {
236 fn number_arrivals(&self, delta: Duration) -> usize {
237 if delta.is_non_zero() {
238 // first, resolve long delta by super-additivity of arrival curves
239 let prefix = delta / self.largest_known_distance();
240 let prefix_jobs = prefix as usize * self.jobs_in_largest_known_distance();
241 let tail = delta % self.largest_known_distance();
242 if tail > self.min_job_separation() {
243 prefix_jobs + self.lookup_arrivals(tail) as usize
244 } else {
245 prefix_jobs + tail.is_non_zero() as usize
246 }
247 } else {
248 0
249 }
250 }
251
252 fn steps_iter<'a>(&'a self) -> Box<dyn Iterator<Item = Duration> + 'a> {
253 let diffs: Vec<_> = iter::once(Duration::zero())
254 .chain(self.min_distance.iter().copied())
255 .zip(self.min_distance.iter().copied())
256 .map(|(a, b)| b - a)
257 .filter(|d| d.is_non_zero())
258 .collect();
259
260 struct StepsIter {
261 sum: Duration,
262 step_sizes: Vec<Duration>,
263 idx: usize,
264 }
265
266 impl Iterator for StepsIter {
267 type Item = Duration;
268
269 fn next(&mut self) -> Option<Self::Item> {
270 let val = self.sum;
271 self.sum += self.step_sizes[self.idx];
272 self.idx = (self.idx + 1) % self.step_sizes.len();
273 Some(val)
274 }
275 }
276
277 Box::new(StepsIter {
278 sum: Duration::from(1),
279 step_sizes: diffs,
280 idx: 0,
281 })
282 }
283
284 fn clone_with_jitter(&self, jitter: Duration) -> Box<dyn ArrivalBound> {
285 Box::new(Propagated::with_jitter(self, jitter))
286 }
287}
288
289impl From<Periodic> for Curve {
290 fn from(p: Periodic) -> Self {
291 Curve {
292 min_distance: vec![p.period],
293 }
294 }
295}
296
297impl From<Sporadic> for Curve {
298 fn from(s: Sporadic) -> Self {
299 let jitter_jobs = divide_with_ceil(s.jitter, s.min_inter_arrival);
300 // Jitter can cause pessimism when applying super-additivity.
301 // Memory is cheap. Hence, unroll quite a bit to avoid running into
302 // pessimism too frequently.
303 // By default, unroll until the jitter jobs are no more than 10% of the
304 // jobs of the jobs in the unrolled interval, and until for at least 500 jobs.
305 let n = 500.max(jitter_jobs as usize * 10);
306 Curve::from_arrival_bound(&s, n)
307 }
308}
309
310/// An arrival curve that automatically extrapolates and
311/// caches extrapolation results using interior mutability.
312#[derive(Clone)]
313pub struct ExtrapolatingCurve {
314 prefix: Rc<RefCell<Curve>>,
315}
316
317impl ExtrapolatingCurve {
318 /// Construct a new auto-extrapolating arrival curve by wrapping
319 /// a given non-extrapolating curve.
320 pub fn new(curve: Curve) -> Self {
321 ExtrapolatingCurve {
322 prefix: Rc::new(RefCell::new(curve)),
323 }
324 }
325}
326
327impl ArrivalBound for ExtrapolatingCurve {
328 fn number_arrivals(&self, delta: Duration) -> usize {
329 if delta.is_zero() {
330 // special case: delta=0 always yields 0
331 0
332 } else {
333 // extrapolate up to the requested duration
334 let mut curve = self.prefix.borrow_mut();
335 curve.extrapolate(delta + Duration::from(1));
336 curve.number_arrivals(delta)
337 }
338 }
339
340 fn steps_iter<'a>(&'a self) -> Box<dyn Iterator<Item = Duration> + 'a> {
341 struct StepsIter<'a> {
342 dist: Duration,
343 curve: &'a ExtrapolatingCurve,
344 njobs: usize,
345 }
346
347 impl<'a> StepsIter<'a> {
348 fn advance(&mut self) {
349 let mut prefix = self.curve.prefix.borrow_mut();
350 while prefix.min_distance(self.njobs) <= self.dist {
351 prefix.extrapolate_steps(self.njobs + 1);
352 self.njobs += 1
353 }
354 self.dist = prefix.min_distance(self.njobs);
355 }
356 }
357
358 impl<'a> Iterator for StepsIter<'a> {
359 type Item = Duration;
360
361 fn next(&mut self) -> Option<Self::Item> {
362 let val = Duration::from(1) + self.dist;
363 self.advance();
364 Some(val)
365 }
366 }
367
368 let prefix = self.prefix.borrow();
369 if prefix.can_extrapolate() {
370 Box::new(StepsIter {
371 dist: Duration::zero(),
372 curve: self,
373 njobs: 0,
374 })
375 } else {
376 // degenerate case --- don't have info to extrapolate,
377 // so just return the periodic process implied by the single-value
378 // dmin function
379 let period = prefix.min_distance(2);
380 Box::new((0..).map(move |j| period * j + Duration::from(1)))
381 }
382 }
383
384 fn clone_with_jitter(&self, jitter: Duration) -> Box<dyn ArrivalBound> {
385 Box::new(Propagated::with_jitter(self, jitter))
386 }
387}