1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
//! Adaptively schedule blocking work //! //! The `adaptive` module defines types that help you schedule *blocking work* (namely, not //! `async`) adaptively. If the work is cheap, we save on the overhead of things like //! [`spawn_blocking`](tokio::task::spawn_blocking) by skipping moving work to a thread //! and instead running it inline in the [`poll`](std::future::Future::poll) implementation. //! //! The most important type in this module is [`AdaptiveFuture`][AdaptiveFuture], which wraps an [`FnOnce`](FnOnce) //! representing the blocking work you want to perform. //! //! ## Usage //! ``` //! use impedance::adaptive::{AdaptiveFuture, Token}; //! use once_cell::sync::Lazy; //! //! static REQUEST_TOKEN: Lazy<Token> = Lazy::new(|| //! Token::new() //! ); //! //! fn deserialize(s: &str) -> i32 { //! // could be expensive to deserialize! //! 1 //! } //! //! async fn send_request() -> String { //! // We are usually doing io here //! "gus".to_string() //! } //! //! async fn make_request() -> i32 { //! let response = send_request().await; //! AdaptiveFuture::new(*REQUEST_TOKEN, move || deserialize(&response)).await //! } //! ``` //! //! ## Scheduling Scheme //! `AdaptiveFuture` decides when to inline work based on the last *wall-time* of the work //! it has performed. The granularity of this *wall-time* is based on the [`Token`](Token) passed //! into the constructor. This allows to user to have fine-grained control based on their //! knowledge of how the *possibly-expensvie* cpu work they are guarding with an `AdaptiveFuture` //! will perform, in different parts of their program. //! //! //! To see more information about how to construct `Token`'s and various options, see //! [`Token`](Token). //! The above example shows the common-case default of using a //! `static` *unique* `Token` configured to use the default cutoff time ([`BLOCKING_CUTOFF_DURATION`][BLOCKING_CUTOFF_DURATION]) //! //! More complex scheduling schemes may be available in the future. use pin_project::pin_project; use std::{ future::Future, pin::Pin, task::{Context, Poll}, time::Duration, }; mod token; pub use token::Token; mod core; use self::core::TimedBlockingFuture; /// see [here](https://github.com/guswynn/impedance/blob/main/benches/comparisons.rs#L66-L71`) /// to get a baseline cost of [spawn_blocking](tokio::task::spawn_blocking) (on your machine) /// /// Currently this is set to `100_000` nanoseconds. This may change, or need to be made /// configurable for your usecase. pub const BLOCKING_CUTOFF_DURATION: Duration = Duration::from_nanos(100000); /// A [`Future`][Future] representing *blocking work* /// /// It either /// 1. Runs work inline in its [`poll`](std::future::Future::poll) implementation /// 2. Schedules the work on another thread using [`spawn_blocking`](tokio::task::spawn_blocking) /// /// see *[the module documentation](self)* for usage examples. #[pin_project] pub struct AdaptiveFuture<O, F> { #[pin] inner: TimedBlockingFuture<O, F>, } impl<O, F: FnOnce() -> O> AdaptiveFuture<O, F> { /// Create a new `AdaptiveFuture` that will adaptively schedule blocking work /// inline or in thread ([`spawn_blocking`](tokio::task::spawn_blocking)) associated /// the [`Token`](Token) pub fn new(token: Token, future: F) -> Self { AdaptiveFuture { inner: TimedBlockingFuture::new(token, BLOCKING_CUTOFF_DURATION, future), } } } impl<O: Send + 'static, F: FnOnce() -> O + Send + 'static> Future for AdaptiveFuture<O, F> { type Output = O; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let this = self.project(); this.inner.poll(cx) } }