ic-bn-lib 0.1.19

Internet Computer Boundary Nodes shared modules
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
//! A load-shedding middleware based on [Little's law].
//!
//! This provides middleware for shedding load to maintain a target average
//! latency, see the documentation on the [`LoadShed`] service for more detail.
//!
//! [Little's law]: https://en.wikipedia.org/wiki/Little%27s_law
//!
//! (c) https://github.com/Skepfyr/little-loadshedder

#![warn(missing_debug_implementations)]
#![allow(clippy::significant_drop_tightening)]
#![allow(clippy::significant_drop_in_scrutinee)]
#![forbid(unsafe_code)]

use std::{
    cmp::Ordering,
    future::Future,
    pin::Pin,
    sync::{Arc, Mutex, atomic::AtomicU64},
    task::{Context, Poll},
    time::{Duration, Instant},
};

use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError};
use tower::{Layer, Service, ServiceExt};

/// Load Shed service's current state of the world
#[derive(Debug)]
pub struct LoadShedConf {
    /// The number of initial requests to pass without shedding
    passthrough_count: u64,
    /// The target average latency in seconds.
    target: f64,
    /// The exponentially weighted moving average parameter.
    /// Must be in the range (0, 1), `0.25` means new value accounts for 25% of
    /// the moving average.
    ewma_param: f64,
    /// Semaphore controlling the waiting queue of requests.
    available_queue: Arc<Semaphore>,
    /// Semaphore controlling concurrency to the inner service.
    available_concurrency: Arc<Semaphore>,
    /// Stats about the latency that change with each completed request.
    stats: Mutex<ConfStats>,
    /// Number of requests that were served
    requests: AtomicU64,
}

#[derive(Debug)]
struct ConfStats {
    /// The current average latency in seconds.
    average_latency: f64,
    /// The average of the latency measured when
    /// `available_concurrent.available_permits() == 0`.
    average_latency_at_capacity: f64,
    /// The number of available permits in the queue semaphore
    /// (the current capacity of the queue).
    queue_capacity: usize,
    /// The number of permits in the available_concurrency semaphore.
    concurrency: usize,
    /// The value of `self.concurrency` before it was last changed.
    previous_concurrency: usize,
    /// The time that the concurrency was last adjusted, to rate limit changing it.
    last_changed: Instant,
    /// Average throughput when at the previous concurrency value.
    previous_throughput: f64,
}

// size of system [req] = target latency [s] * throughput [r/s]
// size of queue [req] = size of system [req] - concurrency [req]
// throughput [req/s] = concurrency [req] / average latency of service [s]
// => (size of queue [req] + concurrency[req]) = target latency [s] * concurrency[req] / latency [s]
// => size of queue [req] = concurrency [req] * (target latency [s] / latency [s] - 1)
//
// Control the concurrency:
// increase concurrency but not beyond target latency
//
// Control queue length:
// queue capacity = concurrency * ((target latency / average latency of service) - 1)

impl LoadShedConf {
    pub fn new(ewma_param: f64, target: f64, passthrough_count: u64) -> Self {
        Self {
            passthrough_count,
            target,
            ewma_param,
            available_concurrency: Arc::new(Semaphore::new(1)),
            available_queue: Arc::new(Semaphore::new(1)),
            stats: Mutex::new(ConfStats {
                average_latency: target,
                average_latency_at_capacity: target,
                queue_capacity: 1,
                concurrency: 1,
                previous_concurrency: 0,
                last_changed: Instant::now(),
                previous_throughput: 0.0,
            }),
            requests: AtomicU64::new(0),
        }
    }

    /// Add ourselves to the queue and wait until we've made it through and have
    /// obtained a permit to send the request.
    async fn start(&self) -> Option<OwnedSemaphorePermit> {
        {
            // Work inside a block so we drop the stats lock asap.
            let mut stats = self.stats.lock().unwrap();
            let desired_queue_capacity = usize::max(
                1, // The queue must always be at least 1 request long.
                // Use average latency at (concurrency) capacity so that this doesn't
                // grow too large while the system is under-utilised.
                (stats.concurrency as f64
                    * ((self.target / stats.average_latency_at_capacity) - 1.0))
                    .floor() as usize,
            );

            // Adjust the semaphore capacity by adding or acquiring many permits.
            // If acquiring permits fails we can return overload and let the next
            // request recompute the queue capacity.
            match desired_queue_capacity.cmp(&stats.queue_capacity) {
                Ordering::Less => {
                    match self
                        .available_queue
                        .try_acquire_many((stats.queue_capacity - desired_queue_capacity) as u32)
                    {
                        Ok(permits) => permits.forget(),
                        Err(TryAcquireError::NoPermits) => return None,
                        Err(TryAcquireError::Closed) => panic!(),
                    }
                }
                Ordering::Equal => {}
                Ordering::Greater => self
                    .available_queue
                    .add_permits(desired_queue_capacity - stats.queue_capacity),
            }
            stats.queue_capacity = desired_queue_capacity;
        }

        // Finally get our queue permit, if this fails then the queue is full
        // and we need to bail out.
        let _queue_permit = match self.available_queue.clone().try_acquire_owned() {
            Ok(queue_permit) => queue_permit,
            Err(TryAcquireError::NoPermits) => return None,
            Err(TryAcquireError::Closed) => panic!("queue semaphore closed?"),
        };

        // We're in the queue now so wait until we get ourselves a concurrency permit.
        let concurrency_permit = self
            .available_concurrency
            .clone()
            .acquire_owned()
            .await
            .unwrap();

        Some(concurrency_permit)
    }

    /// Register a completed call of the inner service, providing the latency to
    /// update the statistics.
    fn stop(&self, elapsed: Duration) {
        let elapsed = elapsed.as_secs_f64();

        // This function solely updates the stats (and is not async) so hold the
        // lock for the entire function.
        let mut stats = self.stats.lock().expect("To be able to lock stats");

        let available_permits = self.available_concurrency.available_permits();
        // Have some leeway on what "at max concurrency" means as you might
        // otherwise never see this condition at large concurrency values.
        let at_max_concurrency = available_permits <= usize::max(1, stats.concurrency / 10);

        // Update the average latency using the EWMA algorithm.
        stats.average_latency = stats
            .average_latency
            .mul_add(1.0 - self.ewma_param, self.ewma_param * elapsed);

        if at_max_concurrency {
            stats.average_latency_at_capacity = stats
                .average_latency_at_capacity
                .mul_add(1.0 - self.ewma_param, self.ewma_param * elapsed);
        }

        // Only ever change max concurrency if we're at the limit as we need
        // measurements to have happened at the current limit.
        // Also, introduce a max rate of change that's somewhat magically
        // related to the latency and ewma parameter to prevent this from
        // changing too quickly.
        if stats.last_changed.elapsed().as_secs_f64()
            > (stats.average_latency / self.ewma_param) / 10.0
            && at_max_concurrency
        {
            // Plausibly should be using average latency at capacity here and
            // stats.concurrency but this appears to work. It might do weird
            // things if it's been running under capacity for a while then spikes.
            let current_concurrency = stats.concurrency - available_permits;
            let throughput = current_concurrency as f64 / stats.average_latency;
            // Was the throughput better or worse than it was previously.
            let negative_gradient = (throughput > stats.previous_throughput)
                ^ (current_concurrency > stats.previous_concurrency);
            if negative_gradient || (stats.average_latency > self.target) {
                // Don't reduce concurrency below 1 or everything stops.
                if stats.concurrency > 1 {
                    // negative gradient so decrease concurrency
                    self.available_concurrency.forget_permits(1);
                    stats.concurrency -= 1;

                    // Adjust the average latency assuming that the change in
                    // concurrency doesn't affect the service latency, which is
                    // closer to the truth than the latency not changing.
                    let latency_factor =
                        stats.concurrency as f64 / (stats.concurrency as f64 + 1.0);
                    stats.average_latency *= latency_factor;
                    stats.average_latency_at_capacity *= latency_factor;
                }
            } else {
                self.available_concurrency.add_permits(1);
                stats.concurrency += 1;

                // Adjust the average latency assuming that the change in
                // concurrency doesn't affect the service latency, which is
                // closer to the truth than the latency not changing.
                let latency_factor = stats.concurrency as f64 / (stats.concurrency as f64 - 1.0);
                stats.average_latency *= latency_factor;
                stats.average_latency_at_capacity *= latency_factor;
            }

            stats.previous_throughput = throughput;
            stats.previous_concurrency = current_concurrency;
            stats.last_changed = Instant::now()
        }
    }
}

#[derive(Debug, Clone)]
pub struct LoadShed<Inner> {
    conf: Arc<LoadShedConf>,
    inner: Inner,
}

impl<Inner> LoadShed<Inner> {
    /// Wrap a service with this middleware, using the given target average
    /// latency and computing the current average latency using an exponentially
    /// weighted moving average with the given parameter.
    pub const fn new(inner: Inner, conf: Arc<LoadShedConf>) -> Self {
        Self { inner, conf }
    }

    /// The current average latency of requests through the inner service,
    /// that is ignoring the queue this service adds.
    pub fn average_latency(&self) -> Duration {
        Duration::from_secs_f64(self.conf.stats.lock().unwrap().average_latency)
    }

    /// The current maximum concurrency of requests to the inner service.
    pub fn concurrency(&self) -> usize {
        self.conf.stats.lock().unwrap().concurrency
    }

    /// The current maximum capacity of this service (including the queue).
    pub fn queue_capacity(&self) -> usize {
        let stats = self.conf.stats.lock().unwrap();
        stats.concurrency + stats.queue_capacity
    }

    /// The current number of requests that have been accepted by this service.
    pub fn queue_len(&self) -> usize {
        let stats = self.conf.stats.lock().unwrap();
        let current_concurrency =
            stats.concurrency - self.conf.available_concurrency.available_permits();
        let current_queue = stats.queue_capacity - self.conf.available_queue.available_permits();

        current_concurrency + current_queue
    }
}

/// Either an error from the wrapped service or message that the request was shed
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum LoadShedResponse<T> {
    /// A response from the inner service.
    Inner(T),
    /// The request was shed due to overload.
    Overload,
}

type BoxFuture<Output> = Pin<Box<dyn Future<Output = Output> + Send>>;

impl<Request, Inner> Service<Request> for LoadShed<Inner>
where
    Request: Send + 'static,
    Inner: Service<Request> + Clone + Send + 'static,
    Inner::Future: Send,
{
    type Response = LoadShedResponse<Inner::Response>;
    type Error = Inner::Error;
    type Future = BoxFuture<Result<Self::Response, Self::Error>>;

    /// Always ready because there's a queue between this service and the inner one.
    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: Request) -> Self::Future {
        // We're fine to use the clone because inner hasn't been polled to
        // readiness yet.
        let inner = self.inner.clone();
        let conf = self.conf.clone();
        let requests = conf
            .requests
            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);

        Box::pin(async move {
            let permit = conf.start().await;
            // If there's no permit & we're past initial passthrough count - then do load shedding.
            if permit.is_none() && requests >= conf.passthrough_count {
                return Ok(LoadShedResponse::Overload);
            }

            let start = Instant::now();
            // The elapsed time includes waiting for readiness which should help
            // us stay under any upstream concurrency limiters.
            let response = inner.oneshot(req).await;
            conf.stop(start.elapsed());
            Ok(LoadShedResponse::Inner(response?))
        })
    }
}

/// A [`Layer`] to wrap services in a [`LoadShed`] middleware.
///
/// See [`LoadShed`] for details of the load shedding algorithm.
#[derive(Debug, Clone)]
pub struct LoadShedLayer(Arc<LoadShedConf>);

impl LoadShedLayer {
    /// Create a new layer with the given target average latency and
    /// computing the current average latency using an exponentially weighted
    /// moving average with the given parameter.
    pub fn new(ewma_param: f64, target: Duration, passthrough_count: u64) -> Self {
        let conf = Arc::new(LoadShedConf::new(
            ewma_param,
            target.as_secs_f64(),
            passthrough_count,
        ));

        Self(conf)
    }
}

impl<Inner> Layer<Inner> for LoadShedLayer {
    type Service = LoadShed<Inner>;

    fn layer(&self, inner: Inner) -> Self::Service {
        LoadShed::new(inner, self.0.clone())
    }
}

#[cfg(test)]
mod test {
    use std::sync::{
        Arc,
        atomic::{AtomicUsize, Ordering},
    };

    use tokio_util::task::TaskTracker;

    use super::*;
    use crate::Error;

    #[derive(Debug, Clone)]
    struct StubService;

    impl Service<Duration> for StubService {
        type Response = ();
        type Error = Error;
        type Future = BoxFuture<Result<Self::Response, Self::Error>>;

        fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
            Poll::Ready(Ok(()))
        }

        fn call(&mut self, req: Duration) -> Self::Future {
            let fut = async move {
                tokio::time::sleep(req).await;
                Ok(())
            };

            Box::pin(fut)
        }
    }

    #[tokio::test]
    async fn test_little_loadshedder() {
        let layer = LoadShedLayer::new(0.9, Duration::from_millis(1), 100);
        let inner = StubService;
        let mut shedder = layer.layer(inner);

        // Now try 100 of concurrent requests with high latency
        // They shouldn't be shedded due to passthrough_requests
        let shedded = Arc::new(AtomicUsize::new(0));
        let tracker = TaskTracker::new();
        for _ in 0..100 {
            let shedder = shedder.clone();
            let shedded = shedded.clone();

            tracker.spawn(async move {
                let resp = shedder.oneshot(Duration::from_millis(10)).await.unwrap();
                if matches!(resp, LoadShedResponse::Overload) {
                    shedded.fetch_add(1, Ordering::SeqCst);
                }
            });
        }

        tracker.close();
        tracker.wait().await;
        assert_eq!(shedded.load(Ordering::SeqCst), 0);

        // Make sure sequential requests are not shedded no matter the latency
        for _ in 0..10 {
            let resp = shedder.call(Duration::from_millis(10)).await.unwrap();
            assert_eq!(resp, LoadShedResponse::Inner(()));
        }

        // Now try 10 of concurrent requests with high latency
        // 8 of them should be shedded
        let shedded = Arc::new(AtomicUsize::new(0));
        let tracker = TaskTracker::new();
        for _ in 0..10 {
            let shedder = shedder.clone();
            let shedded = shedded.clone();

            tracker.spawn(async move {
                let resp = shedder.oneshot(Duration::from_millis(10)).await.unwrap();
                if matches!(resp, LoadShedResponse::Overload) {
                    shedded.fetch_add(1, Ordering::SeqCst);
                }
            });
        }

        tracker.close();
        tracker.wait().await;
        assert_eq!(shedded.load(Ordering::SeqCst), 8);
    }
}