response_time_analysis/arrival/
curve.rs

1use std::cell::RefCell;
2use std::collections::VecDeque;
3use std::iter::{self, FromIterator};
4use std::rc::Rc;
5
6use super::{
7    divide_with_ceil, nonzero_delta_min_iter, ArrivalBound, Periodic, Propagated, Sporadic,
8};
9use crate::time::{Duration, Offset};
10
11/// An arrival curve (also commonly called an "upper event arrival
12/// curve" *η+*) that can describe arbitrarily bursty sporadic
13/// arrival processes.
14///
15/// As is common, the arrival curve is defined by a finite *delta-min
16/// vector* that describes the minimum interval length in which a
17/// certain number of jobs may arrive.
18#[derive(Clone, Debug)]
19pub struct Curve {
20    min_distance: Vec<Duration>,
21}
22
23impl Curve {
24    /// Construct a new arrival curve.
25    ///
26    /// The `delta_min_prefix` is the *delta-min prefix* describing
27    /// the arrival curve. Each element of the vector gives the
28    /// minimum interval length in which a corresponding number of
29    /// jobs may arrive.
30
31    /// **Convention**: we do not store the mininum distance for 0
32    /// and 1 jobs. So the `min_distance` vector at offset 0 contains
33    /// the minimum separation of two jobs, the `min_distance` vector
34    /// at offset 1 contains the length of the shortest interval in
35    /// which three jobs arrive, and so on.
36    pub fn new(delta_min_prefix: Vec<Duration>) -> Curve {
37        assert!(!delta_min_prefix.is_empty());
38        Curve {
39            min_distance: delta_min_prefix,
40        }
41    }
42
43    /// Obtain an arrival curve by inferring a delta-min vector from
44    /// any given arrival process `T`.
45    ///
46    /// The delta-min vector is chosen such that it covers all
47    /// arrivals until the given `horizon`.
48    pub fn from_arrival_bound_until<T: ArrivalBound>(ab: &T, horizon: Duration) -> Curve {
49        Self::new(
50            nonzero_delta_min_iter(&ab)
51                .enumerate()
52                .take_while(|(count, (_njobs, delta))| *delta <= horizon || *count < 2)
53                .map(|(_count, (_njobs, delta))| delta)
54                .collect(),
55        )
56    }
57
58    /// Obtain an arrival curve by inferring a delta-min vector from
59    /// any given arrival process `T`.
60    ///
61    /// The delta-min vector is chosen such that it covers at least
62    /// `up_to_njobs` job arrivals.
63    pub fn from_arrival_bound<T: ArrivalBound>(ab: &T, up_to_njobs: usize) -> Curve {
64        Self::new(
65            nonzero_delta_min_iter(&ab)
66                .enumerate()
67                .take_while(|(count, (njobs, _delta))| *njobs <= up_to_njobs || *count < 2)
68                .map(|(_count, (_njobs, delta))| delta)
69                .collect(),
70        )
71    }
72
73    /// Obtain an arrival curve by inferring a delta-min prefix from
74    /// a given trace of arrival events, expressed as the offset of
75    /// the event from a reference time zero.
76    ///
77    /// The resultant delta-min vector will consist of `prefix_jobs`
78    /// entries (if there are a sufficient number of arrivals in the
79    /// trace).
80    pub fn from_trace(arrival_times: impl Iterator<Item = Offset>, prefix_jobs: usize) -> Curve {
81        let mut d: Vec<Duration> = Vec::with_capacity(prefix_jobs);
82        let mut window: VecDeque<Offset> = VecDeque::with_capacity(prefix_jobs + 1);
83
84        // consider all job arrivals in the trace
85        for t in arrival_times {
86            // sanity check: the arrival times must be monotonic
87            assert!(t >= *(window.back().unwrap_or(&t)));
88            // look at all arrival times in the sliding window, in order
89            // from most recent to oldest
90            for (i, v) in window.iter().rev().enumerate() {
91                // Compute the separation from the current arrival t to the arrival
92                // of the (i + 1)-th preceding job.
93                // So if i=0, we are looking at two adjacent jobs.
94                let observed_gap = v.distance_to(t);
95                if d.len() <= i {
96                    // we have not yet seen (i + 2) jobs in a row -> first sample
97                    d.push(observed_gap)
98                } else {
99                    // update belief if we have seen two events with
100                    // less separation than previously observed
101                    d[i] = d[i].min(observed_gap)
102                }
103            }
104            // add arrival time to sliding window
105            window.push_back(t);
106            // trim sliding window if necessary
107            if window.len() > prefix_jobs {
108                window.pop_front();
109            }
110        }
111
112        // FIXME: d must not be empty
113        Curve::new(d)
114    }
115
116    fn extrapolate_next(&self) -> Duration {
117        let n = self.min_distance.len();
118        assert!(n >= 2);
119        // we are using n - k - 1 here because we don't store n=0 and n=1, so the
120        // index is offset by 2
121        (0..=(n / 2))
122            .map(|k| self.min_distance[k] + self.min_distance[n - k - 1])
123            .max()
124            .unwrap()
125    }
126
127    fn can_extrapolate(&self) -> bool {
128        // we cannot meaningfully extrapolate degenerate cases, so let's skip those
129        self.min_distance.len() >= 2
130    }
131
132    /// Extend the underlying delta-min prefix by exploiting the
133    /// [subadditivity](https://en.wikipedia.org/wiki/Subadditivity)
134    /// of proper arrival curves until the delta-min prefix covers
135    /// intervals of length `horizon`.
136    pub fn extrapolate(&mut self, horizon: Duration) {
137        if self.can_extrapolate() {
138            while self.largest_known_distance() < horizon {
139                self.min_distance.push(self.extrapolate_next())
140            }
141        }
142    }
143
144    /// Extend the underlying delta-min prefix by exploiting the
145    /// [subadditivity](https://en.wikipedia.org/wiki/Subadditivity)
146    /// of proper arrival curves by `n` elements.
147    pub fn extrapolate_steps(&mut self, n: usize) {
148        if self.can_extrapolate() {
149            while self.jobs_in_largest_known_distance() < n {
150                self.min_distance.push(self.extrapolate_next())
151            }
152        }
153    }
154
155    /// Extend the underlying delta-min prefix by one element while
156    /// exploiting _both_ the
157    /// [subadditivity](https://en.wikipedia.org/wiki/Subadditivity)
158    /// of proper arrival curves _and_ the provided bound on the next
159    /// step. The bound is a tuple (`delta`, `njobs`), where `njobs`
160    /// *must* be the element that the underlying delta-min vector is
161    /// extrapolated to.
162    pub fn extrapolate_with_bound(&mut self, bound: (Duration, usize)) {
163        let (delta, njobs) = bound;
164        // We subtract epsilon since we store the distance
165        // between jobs, not the interval length.
166        let dmin = delta - Duration::epsilon();
167        // check that we've been given the expected upper bound
168        // (+ 2 because we don't store the values for 0 and 1 jobs)
169        if self.min_distance.len() + 2 == njobs {
170            if self.can_extrapolate() {
171                let extrapolated = self.extrapolate_next();
172                self.min_distance.push(dmin.max(extrapolated))
173            } else {
174                // If we cannot extrapolate, simply take the given bound.
175                self.min_distance.push(dmin)
176            }
177        }
178    }
179
180    fn min_job_separation(&self) -> Duration {
181        // minimum separation of two jobs given by first element
182        self.min_distance[0]
183    }
184
185    fn largest_known_distance(&self) -> Duration {
186        *self.min_distance.last().unwrap()
187    }
188
189    fn jobs_in_largest_known_distance(&self) -> usize {
190        self.min_distance.len()
191    }
192
193    // note: does not extrapolate
194    fn lookup_arrivals(&self, delta: Duration) -> usize {
195        // TODO: for really large vectors, this should be a binary search...
196        for (i, distance_of_njobs) in self.min_distance.iter().enumerate() {
197            let njobs = i + 2; // we do not store n=0 and n=1
198            if delta <= *distance_of_njobs {
199                return njobs - 1;
200            }
201        }
202        // should never get here
203        panic!()
204    }
205
206    /// Return a lower bound on the length of an interval in which
207    /// `n` arrival events occur.
208    ///
209    /// Note: does not extrapolate, so extremely pessimistic if `n`
210    /// exceeds the length of the internal minimum-distance prefix.
211    pub fn min_distance(&self, n: usize) -> Duration {
212        if n > 1 {
213            // account for the fact that we store distances only for 2+ jobs
214            self.min_distance[(n - 2).min(self.min_distance.len() - 1)]
215        } else {
216            Duration::zero()
217        }
218    }
219}
220
221impl FromIterator<Duration> for Curve {
222    fn from_iter<I: IntoIterator<Item = Duration>>(iter: I) -> Curve {
223        let mut distances: Vec<Duration> = iter.into_iter().collect();
224        // ensure the min-distance function is monotonic
225        for i in 1..distances.len() {
226            distances[i] = distances[i].max(distances[i - 1]);
227        }
228        assert!(!distances.is_empty());
229        Curve {
230            min_distance: distances,
231        }
232    }
233}
234
235impl ArrivalBound for Curve {
236    fn number_arrivals(&self, delta: Duration) -> usize {
237        if delta.is_non_zero() {
238            // first, resolve long delta by super-additivity of arrival curves
239            let prefix = delta / self.largest_known_distance();
240            let prefix_jobs = prefix as usize * self.jobs_in_largest_known_distance();
241            let tail = delta % self.largest_known_distance();
242            if tail > self.min_job_separation() {
243                prefix_jobs + self.lookup_arrivals(tail) as usize
244            } else {
245                prefix_jobs + tail.is_non_zero() as usize
246            }
247        } else {
248            0
249        }
250    }
251
252    fn steps_iter<'a>(&'a self) -> Box<dyn Iterator<Item = Duration> + 'a> {
253        let diffs: Vec<_> = iter::once(Duration::zero())
254            .chain(self.min_distance.iter().copied())
255            .zip(self.min_distance.iter().copied())
256            .map(|(a, b)| b - a)
257            .filter(|d| d.is_non_zero())
258            .collect();
259
260        struct StepsIter {
261            sum: Duration,
262            step_sizes: Vec<Duration>,
263            idx: usize,
264        }
265
266        impl Iterator for StepsIter {
267            type Item = Duration;
268
269            fn next(&mut self) -> Option<Self::Item> {
270                let val = self.sum;
271                self.sum += self.step_sizes[self.idx];
272                self.idx = (self.idx + 1) % self.step_sizes.len();
273                Some(val)
274            }
275        }
276
277        Box::new(StepsIter {
278            sum: Duration::from(1),
279            step_sizes: diffs,
280            idx: 0,
281        })
282    }
283
284    fn clone_with_jitter(&self, jitter: Duration) -> Box<dyn ArrivalBound> {
285        Box::new(Propagated::with_jitter(self, jitter))
286    }
287}
288
289impl From<Periodic> for Curve {
290    fn from(p: Periodic) -> Self {
291        Curve {
292            min_distance: vec![p.period],
293        }
294    }
295}
296
297impl From<Sporadic> for Curve {
298    fn from(s: Sporadic) -> Self {
299        let jitter_jobs = divide_with_ceil(s.jitter, s.min_inter_arrival);
300        // Jitter can cause pessimism when applying super-additivity.
301        // Memory is cheap. Hence, unroll quite a bit to avoid running into
302        // pessimism too frequently.
303        // By default, unroll until the jitter jobs are no more than 10% of the
304        // jobs of the jobs in the unrolled interval, and until for at least 500 jobs.
305        let n = 500.max(jitter_jobs as usize * 10);
306        Curve::from_arrival_bound(&s, n)
307    }
308}
309
310/// An arrival curve that automatically extrapolates and
311/// caches extrapolation results using interior mutability.
312#[derive(Clone)]
313pub struct ExtrapolatingCurve {
314    prefix: Rc<RefCell<Curve>>,
315}
316
317impl ExtrapolatingCurve {
318    /// Construct a new auto-extrapolating arrival curve by wrapping
319    /// a given non-extrapolating curve.
320    pub fn new(curve: Curve) -> Self {
321        ExtrapolatingCurve {
322            prefix: Rc::new(RefCell::new(curve)),
323        }
324    }
325}
326
327impl ArrivalBound for ExtrapolatingCurve {
328    fn number_arrivals(&self, delta: Duration) -> usize {
329        if delta.is_zero() {
330            // special case: delta=0 always yields 0
331            0
332        } else {
333            // extrapolate up to the requested duration
334            let mut curve = self.prefix.borrow_mut();
335            curve.extrapolate(delta + Duration::from(1));
336            curve.number_arrivals(delta)
337        }
338    }
339
340    fn steps_iter<'a>(&'a self) -> Box<dyn Iterator<Item = Duration> + 'a> {
341        struct StepsIter<'a> {
342            dist: Duration,
343            curve: &'a ExtrapolatingCurve,
344            njobs: usize,
345        }
346
347        impl<'a> StepsIter<'a> {
348            fn advance(&mut self) {
349                let mut prefix = self.curve.prefix.borrow_mut();
350                while prefix.min_distance(self.njobs) <= self.dist {
351                    prefix.extrapolate_steps(self.njobs + 1);
352                    self.njobs += 1
353                }
354                self.dist = prefix.min_distance(self.njobs);
355            }
356        }
357
358        impl<'a> Iterator for StepsIter<'a> {
359            type Item = Duration;
360
361            fn next(&mut self) -> Option<Self::Item> {
362                let val = Duration::from(1) + self.dist;
363                self.advance();
364                Some(val)
365            }
366        }
367
368        let prefix = self.prefix.borrow();
369        if prefix.can_extrapolate() {
370            Box::new(StepsIter {
371                dist: Duration::zero(),
372                curve: self,
373                njobs: 0,
374            })
375        } else {
376            // degenerate case --- don't have info to extrapolate,
377            // so just return the periodic process implied by the single-value
378            // dmin function
379            let period = prefix.min_distance(2);
380            Box::new((0..).map(move |j| period * j + Duration::from(1)))
381        }
382    }
383
384    fn clone_with_jitter(&self, jitter: Duration) -> Box<dyn ArrivalBound> {
385        Box::new(Propagated::with_jitter(self, jitter))
386    }
387}