Async Priority Limiter
This library exports the Limiter struct. It throttles prioritised tasks by limiting the max concurrent tasks and minimum time between tasks, with up to two levels based on keys.
Basic use
This example sets up a simple queue where tasks are prioritised by integers. Once a task has been popped from the queue and has started waiting for its permit it will be next in line for execution - if not it may be overtaken by a higher priority task.
# tokio_test::block_on(async {
use async_priority_limiter::Limiter;
use futures::future::join_all;
use std::time::{Duration, Instant};
use tokio::time::sleep;
let limiter = Limiter::new::<String>(1);
limiter
.set_default_interval(Some(Duration::from_secs(1)))
.await;
let start = Instant::now();
let fut_a = limiter
.queue(
async move {
let task_started = Instant::now().duration_since(start).as_secs();
sleep(Duration::from_millis(100)).await;
('A', task_started)
},
10,
)
.await;
let fut_b = limiter
.queue(
async move {
let task_started = Instant::now().duration_since(start).as_secs();
sleep(Duration::from_millis(100)).await;
('B', task_started)
},
10,
)
.await;
let fut_c = limiter
.queue(
async move {
let task_started = Instant::now().duration_since(start).as_secs();
sleep(Duration::from_millis(100)).await;
('C', task_started)
},
20,
)
.await;
let results = join_all([fut_a, fut_b, fut_c]).await;
assert_eq!(results, [('A', 0), ('B', 2), ('C', 1)]);
# });
Using keys
All relevant methods have a variant accepting a "key". This key referes to a second layer of limits. Say I wanted to keep sending a single request at a time, at most once every two seconds, but also limit a specific endpoint to a call once every three seconds:
Note: the default limits act as a baseline and is always awaited along the key limits. Adding a key limit that is faster than the baseline therefore does nothing!
# tokio_test::block_on(async {
use async_priority_limiter::Limiter;
use futures::future::join_all;
use std::time::{Duration, Instant};
use tokio::time::sleep;
let limiter = Limiter::new(1);
limiter
.set_default_interval(Some(Duration::from_secs(2)))
.await;
limiter
.set_interval_by_key(Some(Duration::from_secs(5)), "blargh")
.await;
let start = Instant::now();
let fut_a = limiter
.queue(
async move {
let task_started = Instant::now().duration_since(start).as_secs();
sleep(Duration::from_millis(100)).await;
('A', task_started)
},
10,
)
.await;
let fut_b = limiter
.queue(
async move {
let task_started = Instant::now().duration_since(start).as_secs();
sleep(Duration::from_millis(100)).await;
('B', task_started)
},
10,
)
.await;
let fut_c = limiter
.queue_by_key(
async move {
let task_started = Instant::now().duration_since(start).as_secs();
sleep(Duration::from_millis(100)).await;
('C', task_started)
},
10, "blargh", )
.await;
let fut_d = limiter
.queue_by_key(
async move {
let task_started = Instant::now().duration_since(start).as_secs();
sleep(Duration::from_millis(100)).await;
('D', task_started)
},
20,
"blargh",
)
.await;
let results = join_all([fut_a, fut_b, fut_c, fut_d]).await;
assert_eq!(results, [('A', 0), ('B', 4), ('C', 6), ('D', 2)]);
# });
Features
reqwest
Enables the send_limiter and send_limited_by_key methods for the reqwest::RequestBuilder struct, and the update_limiter_by_retry_after_header and update_limiter_by_key_and_retry_after_header for the reqwest::Response struct:
let client = reqwest::Client::new();
let limits = async_priority_limiter::Limiter::new(1);
let response = client
.get("herpderp")
.send_limited(&limits)
.await?
// If the status is 429 and the Retry-After-header contained a valid value, the limiter will be updated accordingly, setting a hard timeout on requests:
.update_limiter_by_retry_after_header(&limits)
.await;
open_ai
Enables the update_limiter_by_open_ai_ratelimit_headers and update_limiter_by_open_ai_ratelimit_headers_by_key methods of the reqwest::Response struct.
This prevents new requests from being made if remaining tokens or requests reach zero or less until the counter resets.
Requires the reqwest feature.