1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
use std::cell::RefCell;
use std::collections::VecDeque;
use std::iter::{self, FromIterator};
use std::rc::Rc;
use super::{
divide_with_ceil, nonzero_delta_min_iter, ArrivalBound, Periodic, Propagated, Sporadic,
};
use crate::time::{Duration, Offset};
/// An arrival curve (also commonly called an "upper event arrival
/// curve" *η+*) that can describe arbitrarily bursty sporadic
/// arrival processes.
///
/// As is common, the arrival curve is defined by a finite *delta-min
/// vector* that describes the minimum interval length in which a
/// certain number of jobs may arrive.
#[derive(Clone, Debug)]
pub struct Curve {
min_distance: Vec<Duration>,
}
impl Curve {
/// Construct a new arrival curve.
///
/// The `delta_min_prefix` is the *delta-min prefix* describing
/// the arrival curve. Each element of the vector gives the
/// minimum interval length in which a corresponding number of
/// jobs may arrive.
/// **Convention**: we do not store the mininum distance for 0
/// and 1 jobs. So the `min_distance` vector at offset 0 contains
/// the minimum separation of two jobs, the `min_distance` vector
/// at offset 1 contains the length of the shortest interval in
/// which three jobs arrive, and so on.
pub fn new(delta_min_prefix: Vec<Duration>) -> Curve {
assert!(!delta_min_prefix.is_empty());
Curve {
min_distance: delta_min_prefix,
}
}
/// Obtain an arrival curve by inferring a delta-min vector from
/// any given arrival process `T`.
///
/// The delta-min vector is chosen such that it covers all
/// arrivals until the given `horizon`.
pub fn from_arrival_bound_until<T: ArrivalBound>(ab: &T, horizon: Duration) -> Curve {
Self::new(
nonzero_delta_min_iter(&ab)
.enumerate()
.take_while(|(count, (_njobs, delta))| *delta <= horizon || *count < 2)
.map(|(_count, (_njobs, delta))| delta)
.collect(),
)
}
/// Obtain an arrival curve by inferring a delta-min vector from
/// any given arrival process `T`.
///
/// The delta-min vector is chosen such that it covers at least
/// `up_to_njobs` job arrivals.
pub fn from_arrival_bound<T: ArrivalBound>(ab: &T, up_to_njobs: usize) -> Curve {
Self::new(
nonzero_delta_min_iter(&ab)
.enumerate()
.take_while(|(count, (njobs, _delta))| *njobs <= up_to_njobs || *count < 2)
.map(|(_count, (_njobs, delta))| delta)
.collect(),
)
}
/// Obtain an arrival curve by inferring a delta-min prefix from
/// a given trace of arrival events, expressed as the offset of
/// the event from a reference time zero.
///
/// The resultant delta-min vector will consist of `prefix_jobs`
/// entries (if there are a sufficient number of arrivals in the
/// trace).
pub fn from_trace(arrival_times: impl Iterator<Item = Offset>, prefix_jobs: usize) -> Curve {
let mut d: Vec<Duration> = Vec::with_capacity(prefix_jobs);
let mut window: VecDeque<Offset> = VecDeque::with_capacity(prefix_jobs + 1);
// consider all job arrivals in the trace
for t in arrival_times {
// sanity check: the arrival times must be monotonic
assert!(t >= *(window.back().unwrap_or(&t)));
// look at all arrival times in the sliding window, in order
// from most recent to oldest
for (i, v) in window.iter().rev().enumerate() {
// Compute the separation from the current arrival t to the arrival
// of the (i + 1)-th preceding job.
// So if i=0, we are looking at two adjacent jobs.
let observed_gap = v.distance_to(t);
if d.len() <= i {
// we have not yet seen (i + 2) jobs in a row -> first sample
d.push(observed_gap)
} else {
// update belief if we have seen two events with
// less separation than previously observed
d[i] = d[i].min(observed_gap)
}
}
// add arrival time to sliding window
window.push_back(t);
// trim sliding window if necessary
if window.len() > prefix_jobs {
window.pop_front();
}
}
// FIXME: d must not be empty
Curve::new(d)
}
fn extrapolate_next(&self) -> Duration {
let n = self.min_distance.len();
assert!(n >= 2);
// we are using n - k - 1 here because we don't store n=0 and n=1, so the
// index is offset by 2
(0..=(n / 2))
.map(|k| self.min_distance[k] + self.min_distance[n - k - 1])
.max()
.unwrap()
}
fn can_extrapolate(&self) -> bool {
// we cannot meaningfully extrapolate degenerate cases, so let's skip those
self.min_distance.len() >= 2
}
/// Extend the underlying delta-min prefix by exploiting the
/// [subadditivity](https://en.wikipedia.org/wiki/Subadditivity)
/// of proper arrival curves until the delta-min prefix covers
/// intervals of length `horizon`.
pub fn extrapolate(&mut self, horizon: Duration) {
if self.can_extrapolate() {
while self.largest_known_distance() < horizon {
self.min_distance.push(self.extrapolate_next())
}
}
}
/// Extend the underlying delta-min prefix by exploiting the
/// [subadditivity](https://en.wikipedia.org/wiki/Subadditivity)
/// of proper arrival curves by `n` elements.
pub fn extrapolate_steps(&mut self, n: usize) {
if self.can_extrapolate() {
while self.jobs_in_largest_known_distance() < n {
self.min_distance.push(self.extrapolate_next())
}
}
}
/// Extend the underlying delta-min prefix by one element while
/// exploiting _both_ the
/// [subadditivity](https://en.wikipedia.org/wiki/Subadditivity)
/// of proper arrival curves _and_ the provided bound on the next
/// step. The bound is a tuple (`delta`, `njobs`), where `njobs`
/// *must* be the element that the underlying delta-min vector is
/// extrapolated to.
pub fn extrapolate_with_bound(&mut self, bound: (Duration, usize)) {
let (delta, njobs) = bound;
// We subtract epsilon since we store the distance
// between jobs, not the interval length.
let dmin = delta - Duration::epsilon();
// check that we've been given the expected upper bound
// (+ 2 because we don't store the values for 0 and 1 jobs)
if self.min_distance.len() + 2 == njobs {
if self.can_extrapolate() {
let extrapolated = self.extrapolate_next();
self.min_distance.push(dmin.max(extrapolated))
} else {
// If we cannot extrapolate, simply take the given bound.
self.min_distance.push(dmin)
}
}
}
fn min_job_separation(&self) -> Duration {
// minimum separation of two jobs given by first element
self.min_distance[0]
}
fn largest_known_distance(&self) -> Duration {
*self.min_distance.last().unwrap()
}
fn jobs_in_largest_known_distance(&self) -> usize {
self.min_distance.len()
}
// note: does not extrapolate
fn lookup_arrivals(&self, delta: Duration) -> usize {
// TODO: for really large vectors, this should be a binary search...
for (i, distance_of_njobs) in self.min_distance.iter().enumerate() {
let njobs = i + 2; // we do not store n=0 and n=1
if delta <= *distance_of_njobs {
return njobs - 1;
}
}
// should never get here
panic!()
}
/// Return a lower bound on the length of an interval in which
/// `n` arrival events occur.
///
/// Note: does not extrapolate, so extremely pessimistic if `n`
/// exceeds the length of the internal minimum-distance prefix.
pub fn min_distance(&self, n: usize) -> Duration {
if n > 1 {
// account for the fact that we store distances only for 2+ jobs
self.min_distance[(n - 2).min(self.min_distance.len() - 1)]
} else {
Duration::zero()
}
}
}
impl FromIterator<Duration> for Curve {
fn from_iter<I: IntoIterator<Item = Duration>>(iter: I) -> Curve {
let mut distances: Vec<Duration> = iter.into_iter().collect();
// ensure the min-distance function is monotonic
for i in 1..distances.len() {
distances[i] = distances[i].max(distances[i - 1]);
}
assert!(!distances.is_empty());
Curve {
min_distance: distances,
}
}
}
impl ArrivalBound for Curve {
fn number_arrivals(&self, delta: Duration) -> usize {
if delta.is_non_zero() {
// first, resolve long delta by super-additivity of arrival curves
let prefix = delta / self.largest_known_distance();
let prefix_jobs = prefix as usize * self.jobs_in_largest_known_distance();
let tail = delta % self.largest_known_distance();
if tail > self.min_job_separation() {
prefix_jobs + self.lookup_arrivals(tail) as usize
} else {
prefix_jobs + tail.is_non_zero() as usize
}
} else {
0
}
}
fn steps_iter<'a>(&'a self) -> Box<dyn Iterator<Item = Duration> + 'a> {
let diffs: Vec<_> = iter::once(Duration::zero())
.chain(self.min_distance.iter().copied())
.zip(self.min_distance.iter().copied())
.map(|(a, b)| b - a)
.filter(|d| d.is_non_zero())
.collect();
struct StepsIter {
sum: Duration,
step_sizes: Vec<Duration>,
idx: usize,
}
impl Iterator for StepsIter {
type Item = Duration;
fn next(&mut self) -> Option<Self::Item> {
let val = self.sum;
self.sum += self.step_sizes[self.idx];
self.idx = (self.idx + 1) % self.step_sizes.len();
Some(val)
}
}
Box::new(StepsIter {
sum: Duration::from(1),
step_sizes: diffs,
idx: 0,
})
}
fn clone_with_jitter(&self, jitter: Duration) -> Box<dyn ArrivalBound> {
Box::new(Propagated::with_jitter(self, jitter))
}
}
impl From<Periodic> for Curve {
fn from(p: Periodic) -> Self {
Curve {
min_distance: vec![p.period],
}
}
}
impl From<Sporadic> for Curve {
fn from(s: Sporadic) -> Self {
let jitter_jobs = divide_with_ceil(s.jitter, s.min_inter_arrival);
// Jitter can cause pessimism when applying super-additivity.
// Memory is cheap. Hence, unroll quite a bit to avoid running into
// pessimism too frequently.
// By default, unroll until the jitter jobs are no more than 10% of the
// jobs of the jobs in the unrolled interval, and until for at least 500 jobs.
let n = 500.max(jitter_jobs as usize * 10);
Curve::from_arrival_bound(&s, n)
}
}
/// An arrival curve that automatically extrapolates and
/// caches extrapolation results using interior mutability.
#[derive(Clone)]
pub struct ExtrapolatingCurve {
prefix: Rc<RefCell<Curve>>,
}
impl ExtrapolatingCurve {
/// Construct a new auto-extrapolating arrival curve by wrapping
/// a given non-extrapolating curve.
pub fn new(curve: Curve) -> Self {
ExtrapolatingCurve {
prefix: Rc::new(RefCell::new(curve)),
}
}
}
impl ArrivalBound for ExtrapolatingCurve {
fn number_arrivals(&self, delta: Duration) -> usize {
if delta.is_zero() {
// special case: delta=0 always yields 0
0
} else {
// extrapolate up to the requested duration
let mut curve = self.prefix.borrow_mut();
curve.extrapolate(delta + Duration::from(1));
curve.number_arrivals(delta)
}
}
fn steps_iter<'a>(&'a self) -> Box<dyn Iterator<Item = Duration> + 'a> {
struct StepsIter<'a> {
dist: Duration,
curve: &'a ExtrapolatingCurve,
njobs: usize,
}
impl<'a> StepsIter<'a> {
fn advance(&mut self) {
let mut prefix = self.curve.prefix.borrow_mut();
while prefix.min_distance(self.njobs) <= self.dist {
prefix.extrapolate_steps(self.njobs + 1);
self.njobs += 1
}
self.dist = prefix.min_distance(self.njobs);
}
}
impl<'a> Iterator for StepsIter<'a> {
type Item = Duration;
fn next(&mut self) -> Option<Self::Item> {
let val = Duration::from(1) + self.dist;
self.advance();
Some(val)
}
}
let prefix = self.prefix.borrow();
if prefix.can_extrapolate() {
Box::new(StepsIter {
dist: Duration::zero(),
curve: self,
njobs: 0,
})
} else {
// degenerate case --- don't have info to extrapolate,
// so just return the periodic process implied by the single-value
// dmin function
let period = prefix.min_distance(2);
Box::new((0..).map(move |j| period * j + Duration::from(1)))
}
}
fn clone_with_jitter(&self, jitter: Duration) -> Box<dyn ArrivalBound> {
Box::new(Propagated::with_jitter(self, jitter))
}
}