Async Priority Limiter
This library exports the Limiter struct. It throttles prioritised tasks by limiting the max concurrent tasks and minimum time between tasks, with up to two levels based on keys.
Basic use
This example sets up a simple queue where tasks are prioritised by integers. Once a task has been popped from the queue and has started waiting for its permit it will be next in line for execution - if not it may be overtaken by a higher priority task.
# tokio_test::block_on(async {
use async_priority_limiter::Limiter;
use futures::future::join_all;
use std::time::{Duration, Instant};
use tokio::time::sleep;
let limiter = Limiter::default();
limiter
.set_default_interval(Some(Duration::from_secs(1)))
.await;
let start = Instant::now();
let fut_a = limiter
.queue(
async move {
let task_started = Instant::now().duration_since(start).as_secs();
sleep(Duration::from_millis(100)).await;
('A', task_started)
},
10,
)
.await;
let fut_b = limiter
.queue(
async move {
let task_started = Instant::now().duration_since(start).as_secs();
sleep(Duration::from_millis(100)).await;
('B', task_started)
},
10,
)
.await;
let fut_c = limiter
.queue(
async move {
let task_started = Instant::now().duration_since(start).as_secs();
sleep(Duration::from_millis(100)).await;
('C', task_started)
},
20,
)
.await;
let results = join_all([fut_a, fut_b, fut_c]).await;
assert_eq!(results, [('A', 0), ('B', 2), ('C', 1)]);
# });
Using keys
All relevant methods have a variant accepting a "key". This key referes to a second layer of limits. Say I wanted to keep sending a single request at a time, at most once every two seconds, but also limit a specific endpoint to a call once every three seconds:
Note: the default limits act as a baseline and is always awaited along the key limits. Adding a key limit that is faster than the baseline therefore does nothing!
# tokio_test::block_on(async {
use async_priority_limiter::Limiter;
use futures::future::join_all;
use std::time::{Duration, Instant};
use tokio::time::sleep;
let limiter = Limiter::new(1);
limiter
.set_default_interval(Some(Duration::from_secs(2)))
.await;
limiter
.set_interval_by_key(Some(Duration::from_secs(5)), "blargh")
.await;
let start = Instant::now();
let fut_a = limiter
.queue(
async move {
let task_started = Instant::now().duration_since(start).as_secs();
sleep(Duration::from_millis(100)).await;
('A', task_started)
},
10,
)
.await;
let fut_b = limiter
.queue(
async move {
let task_started = Instant::now().duration_since(start).as_secs();
sleep(Duration::from_millis(100)).await;
('B', task_started)
},
10,
)
.await;
let fut_c = limiter
.queue_by_key(
async move {
let task_started = Instant::now().duration_since(start).as_secs();
sleep(Duration::from_millis(100)).await;
('C', task_started)
},
10, "blargh", )
.await;
let fut_d = limiter
.queue_by_key(
async move {
let task_started = Instant::now().duration_since(start).as_secs();
sleep(Duration::from_millis(100)).await;
('D', task_started)
},
20,
"blargh",
)
.await;
let results = join_all([fut_a, fut_b, fut_c, fut_d]).await;
assert_eq!(results, [('A', 0), ('B', 4), ('C', 6), ('D', 2)]);
# });
Features
reqwest
Enables the send_limiter
and send_limited_by_key
methods for the reqwest::RequestBuilder
struct, and the update_limiter_by_retry_after_header
and update_limiter_by_key_and_retry_after_header
for the reqwest::Response
struct:
let client = reqwest::Client::new();
let limits = async_priority_limiter::Limiter::new(1);
let response = client
.get("herpderp")
.send_limited(&limits)
.await?
// If the status is 429 and the Retry-After-header contained a valid value, the limiter will be updated accordingly, setting a hard timeout on requests:
.update_limiter_by_retry_after_header(&limits)
.await;
open_ai
Enables the update_limiter_by_open_ai_ratelimit_headers
and update_limiter_by_open_ai_ratelimit_headers_by_key
methods of the reqwest::Response
struct.
This prevents new requests from being made if remaining tokens or requests reach zero or less until the counter resets.
Requires the reqwest
feature.