pub struct LocalSet { /* private fields */ }
Expand description

A set of tasks which are executed on the same thread.

In some cases, it is necessary to run one or more futures that do not implement Send and thus are unsafe to send between threads. In these cases, a local task set may be used to schedule one or more !Send futures to run together on the same thread.

For example, the following code will not compile:

use std::rc::Rc;

#[tokio::main]
async fn main() {
    // `Rc` does not implement `Send`, and thus may not be sent between
    // threads safely.
    let nonsend_data = Rc::new("my nonsend data...");

    let nonsend_data = nonsend_data.clone();
    // Because the `async` block here moves `nonsend_data`, the future is `!Send`.
    // Since `tokio::spawn` requires the spawned future to implement `Send`, this
    // will not compile.
    tokio::spawn(async move {
        println!("{}", nonsend_data);
        // ...
    }).await.unwrap();
}

Use with run_until

To spawn !Send futures, we can use a local task set to schedule them on the thread calling Runtime::block_on. When running inside of the local task set, we can use task::spawn_local, which can spawn !Send futures. For example:

use std::rc::Rc;
use tokio::task;

#[tokio::main]
async fn main() {
    let nonsend_data = Rc::new("my nonsend data...");

    // Construct a local task set that can run `!Send` futures.
    let local = task::LocalSet::new();

    // Run the local task set.
    local.run_until(async move {
        let nonsend_data = nonsend_data.clone();
        // `spawn_local` ensures that the future is spawned on the local
        // task set.
        task::spawn_local(async move {
            println!("{}", nonsend_data);
            // ...
        }).await.unwrap();
    }).await;
}

Note: The run_until method can only be used in #[tokio::main], #[tokio::test] or directly inside a call to Runtime::block_on. It cannot be used inside a task spawned with tokio::spawn.

Awaiting a LocalSet

Additionally, a LocalSet itself implements Future, completing when all tasks spawned on the LocalSet complete. This can be used to run several futures on a LocalSet and drive the whole set until they complete. For example,

use tokio::{task, time};
use std::rc::Rc;

#[tokio::main]
async fn main() {
    let nonsend_data = Rc::new("world");
    let local = task::LocalSet::new();

    let nonsend_data2 = nonsend_data.clone();
    local.spawn_local(async move {
        // ...
        println!("hello {}", nonsend_data2)
    });

    local.spawn_local(async move {
        time::sleep(time::Duration::from_millis(100)).await;
        println!("goodbye {}", nonsend_data)
    });

    // ...

    local.await;
}

Note: Awaiting a LocalSet can only be done inside #[tokio::main], #[tokio::test] or directly inside a call to Runtime::block_on. It cannot be used inside a task spawned with tokio::spawn.

Use inside tokio::spawn

The two methods mentioned above cannot be used inside tokio::spawn, so to spawn !Send futures from inside tokio::spawn, we need to do something else. The solution is to create the LocalSet somewhere else, and communicate with it using an mpsc channel.

The following example puts the LocalSet inside a new thread.

use tokio::runtime::Builder;
use tokio::sync::{mpsc, oneshot};
use tokio::task::LocalSet;

// This struct describes the task you want to spawn. Here we include
// some simple examples. The oneshot channel allows sending a response
// to the spawner.
#[derive(Debug)]
enum Task {
    PrintNumber(u32),
    AddOne(u32, oneshot::Sender<u32>),
}

#[derive(Clone)]
struct LocalSpawner {
   send: mpsc::UnboundedSender<Task>,
}

impl LocalSpawner {
    pub fn new() -> Self {
        let (send, mut recv) = mpsc::unbounded_channel();

        let rt = Builder::new_current_thread()
            .enable_all()
            .build()
            .unwrap();

        std::thread::spawn(move || {
            let local = LocalSet::new();

            local.spawn_local(async move {
                while let Some(new_task) = recv.recv().await {
                    tokio::task::spawn_local(run_task(new_task));
                }
                // If the while loop returns, then all the LocalSpawner
                // objects have been dropped.
            });

            // This will return once all senders are dropped and all
            // spawned tasks have returned.
            rt.block_on(local);
        });

        Self {
            send,
        }
    }

    pub fn spawn(&self, task: Task) {
        self.send.send(task).expect("Thread with LocalSet has shut down.");
    }
}

// This task may do !Send stuff. We use printing a number as an example,
// but it could be anything.
//
// The Task struct is an enum to support spawning many different kinds
// of operations.
async fn run_task(task: Task) {
    match task {
        Task::PrintNumber(n) => {
            println!("{}", n);
        },
        Task::AddOne(n, response) => {
            // We ignore failures to send the response.
            let _ = response.send(n + 1);
        },
    }
}

#[tokio::main]
async fn main() {
    let spawner = LocalSpawner::new();

    let (send, response) = oneshot::channel();
    spawner.spawn(Task::AddOne(10, send));
    let eleven = response.await.unwrap();
    assert_eq!(eleven, 11);
}

Implementations§

source§

impl LocalSet

source

pub fn new() -> LocalSet

Returns a new local task set.

source

pub fn enter(&self) -> LocalEnterGuard

Enters the context of this LocalSet.

The spawn_local method will spawn tasks on the LocalSet whose context you are inside.

source

pub fn spawn_local<F>(&self, future: F) -> JoinHandle<<F as Future>::Output>
where F: Future + 'static, <F as Future>::Output: 'static,

Spawns a !Send task onto the local task set.

This task is guaranteed to be run on the current thread.

Unlike the free function spawn_local, this method may be used to spawn local tasks when the LocalSet is not running. The provided future will start running once the LocalSet is next started, even if you don’t await the returned JoinHandle.

Examples
use tokio::task;

#[tokio::main]
async fn main() {
    let local = task::LocalSet::new();

    // Spawn a future on the local set. This future will be run when
    // we call `run_until` to drive the task set.
    local.spawn_local(async {
       // ...
    });

    // Run the local task set.
    local.run_until(async move {
        // ...
    }).await;

    // When `run` finishes, we can spawn _more_ futures, which will
    // run in subsequent calls to `run_until`.
    local.spawn_local(async {
       // ...
    });

    local.run_until(async move {
        // ...
    }).await;
}
source

pub fn block_on<F>(&self, rt: &Runtime, future: F) -> <F as Future>::Output
where F: Future,

Runs a future to completion on the provided runtime, driving any local futures spawned on this task set on the current thread.

This runs the given future on the runtime, blocking until it is complete, and yielding its resolved result. Any tasks or timers which the future spawns internally will be executed on the runtime. The future may also call spawn_local to spawn_local additional local futures on the current thread.

This method should not be called from an asynchronous context.

Panics

This function panics if the executor is at capacity, if the provided future panics, or if called within an asynchronous execution context.

Notes

Since this function internally calls Runtime::block_on, and drives futures in the local task set inside that call to block_on, the local futures may not use in-place blocking. If a blocking call needs to be issued from a local task, the spawn_blocking API may be used instead.

For example, this will panic:

use tokio::runtime::Runtime;
use tokio::task;

let rt  = Runtime::new().unwrap();
let local = task::LocalSet::new();
local.block_on(&rt, async {
    let join = task::spawn_local(async {
        let blocking_result = task::block_in_place(|| {
            // ...
        });
        // ...
    });
    join.await.unwrap();
})

This, however, will not panic:

use tokio::runtime::Runtime;
use tokio::task;

let rt  = Runtime::new().unwrap();
let local = task::LocalSet::new();
local.block_on(&rt, async {
    let join = task::spawn_local(async {
        let blocking_result = task::spawn_blocking(|| {
            // ...
        }).await;
        // ...
    });
    join.await.unwrap();
})
source

pub async fn run_until<F>(&self, future: F) -> <F as Future>::Output
where F: Future,

Runs a future to completion on the local set, returning its output.

This returns a future that runs the given future with a local set, allowing it to call spawn_local to spawn additional !Send futures. Any local futures spawned on the local set will be driven in the background until the future passed to run_until completes. When the future passed to run finishes, any local futures which have not completed will remain on the local set, and will be driven on subsequent calls to run_until or when awaiting the local set itself.

Cancel safety

This method is cancel safe when future is cancel safe.

Examples
use tokio::task;

#[tokio::main]
async fn main() {
    task::LocalSet::new().run_until(async {
        task::spawn_local(async move {
            // ...
        }).await.unwrap();
        // ...
    }).await;
}

Trait Implementations§

source§

impl Debug for LocalSet

source§

fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), Error>

Formats the value using the given formatter. Read more
source§

impl Default for LocalSet

source§

fn default() -> LocalSet

Returns the “default value” for a type. Read more
source§

impl Drop for LocalSet

source§

fn drop(&mut self)

Executes the destructor for this type. Read more
source§

impl Future for LocalSet

§

type Output = ()

The type of value produced on completion.
source§

fn poll( self: Pin<&mut LocalSet>, cx: &mut Context<'_> ) -> Poll<<LocalSet as Future>::Output>

Attempt to resolve the future to a final value, registering the current task for wakeup if the value is not yet available. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
§

impl<T> Any for T
where T: Any,

§

fn into_any(self: Box<T>) -> Box<dyn Any>

§

fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>

§

fn type_name(&self) -> &'static str

§

impl<T> ArchivePointee for T

§

type ArchivedMetadata = ()

The archived version of the pointer metadata for this type.
§

fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata ) -> <T as Pointee>::Metadata

Converts some archived metadata to the pointer metadata for itself.
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
§

impl<F, W, T, D> Deserialize<With<T, W>, D> for F
where W: DeserializeWith<F, T, D>, D: Fallible + ?Sized, F: ?Sized,

§

fn deserialize( &self, deserializer: &mut D ) -> Result<With<T, W>, <D as Fallible>::Error>

Deserializes using the given deserializer
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<F> FutureExt for F
where F: Future + ?Sized,

§

fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Self::Output>
where Self: Unpin,

A convenience for calling Future::poll() on !Unpin types.
§

fn or<F>(self, other: F) -> Or<Self, F>
where Self: Sized, F: Future<Output = Self::Output>,

Returns the result of self or other future, preferring self if both are ready. Read more
§

fn race<F>(self, other: F) -> Race<Self, F>
where Self: Sized, F: Future<Output = Self::Output>,

Returns the result of self or other future, with no preference if both are ready. Read more
§

fn catch_unwind(self) -> CatchUnwind<Self>
where Self: Sized + UnwindSafe,

Catches panics while polling the future. Read more
§

fn boxed<'a>(self) -> Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>
where Self: Sized + Send + 'a,

Boxes the future and changes its type to dyn Future + Send + 'a. Read more
§

fn boxed_local<'a>(self) -> Pin<Box<dyn Future<Output = Self::Output> + 'a>>
where Self: Sized + 'a,

Boxes the future and changes its type to dyn Future + 'a. Read more
§

impl<F> FutureExt for F
where F: Future + ?Sized,

§

fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Self::Output>
where Self: Unpin,

A convenience for calling Future::poll() on !Unpin types.
§

fn or<F>(self, other: F) -> Or<Self, F>
where Self: Sized, F: Future<Output = Self::Output>,

Returns the result of self or other future, preferring self if both are ready. Read more
§

fn race<F>(self, other: F) -> Race<Self, F>
where Self: Sized, F: Future<Output = Self::Output>,

Returns the result of self or other future, with no preference if both are ready. Read more
§

fn catch_unwind(self) -> CatchUnwind<Self>
where Self: Sized + UnwindSafe,

Catches panics while polling the future. Read more
§

fn boxed<'a>(self) -> Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>
where Self: Sized + Send + 'a,

Boxes the future and changes its type to dyn Future + Send + 'a. Read more
§

fn boxed_local<'a>(self) -> Pin<Box<dyn Future<Output = Self::Output> + 'a>>
where Self: Sized + 'a,

Boxes the future and changes its type to dyn Future + 'a. Read more
§

impl<T> FutureExt for T
where T: Future + ?Sized,

§

fn map<U, F>(self, f: F) -> Map<Self, F>
where F: FnOnce(Self::Output) -> U, Self: Sized,

Map this future’s output to a different type, returning a new future of the resulting type. Read more
§

fn map_into<U>(self) -> MapInto<Self, U>
where Self::Output: Into<U>, Self: Sized,

Map this future’s output to a different type, returning a new future of the resulting type. Read more
§

fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>
where F: FnOnce(Self::Output) -> Fut, Fut: Future, Self: Sized,

Chain on a computation for when a future finished, passing the result of the future to the provided closure f. Read more
§

fn left_future<B>(self) -> Either<Self, B>
where B: Future<Output = Self::Output>, Self: Sized,

Wrap this future in an Either future, making it the left-hand variant of that Either. Read more
§

fn right_future<A>(self) -> Either<A, Self>
where A: Future<Output = Self::Output>, Self: Sized,

Wrap this future in an Either future, making it the right-hand variant of that Either. Read more
§

fn into_stream(self) -> IntoStream<Self>
where Self: Sized,

Convert this future into a single element stream. Read more
§

fn flatten(self) -> Flatten<Self>
where Self::Output: Future, Self: Sized,

Flatten the execution of this future when the output of this future is itself another future. Read more
§

fn flatten_stream(self) -> FlattenStream<Self>
where Self::Output: Stream, Self: Sized,

Flatten the execution of this future when the successful result of this future is a stream. Read more
§

fn fuse(self) -> Fuse<Self>
where Self: Sized,

Fuse a future such that poll will never again be called once it has completed. This method can be used to turn any Future into a FusedFuture. Read more
§

fn inspect<F>(self, f: F) -> Inspect<Self, F>
where F: FnOnce(&Self::Output), Self: Sized,

Do something with the output of a future before passing it on. Read more
§

fn catch_unwind(self) -> CatchUnwind<Self>
where Self: Sized + UnwindSafe,

Catches unwinding panics while polling the future. Read more
§

fn shared(self) -> Shared<Self>
where Self: Sized, Self::Output: Clone,

Create a cloneable handle to this future where all handles will resolve to the same result. Read more
§

fn remote_handle(self) -> (Remote<Self>, RemoteHandle<Self::Output>)
where Self: Sized,

Turn this future into a future that yields () on completion and sends its output to another future on a separate task. Read more
§

fn boxed<'a>(self) -> Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>
where Self: Sized + Send + 'a,

Wrap the future in a Box, pinning it. Read more
§

fn boxed_local<'a>(self) -> Pin<Box<dyn Future<Output = Self::Output> + 'a>>
where Self: Sized + 'a,

Wrap the future in a Box, pinning it. Read more
§

fn unit_error(self) -> UnitError<Self>
where Self: Sized,

§

fn never_error(self) -> NeverError<Self>
where Self: Sized,

§

fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Self::Output>
where Self: Unpin,

A convenience for calling Future::poll on Unpin future types.
§

fn now_or_never(self) -> Option<Self::Output>
where Self: Sized,

Evaluates and consumes the future, returning the resulting output if the future is ready after the first call to Future::poll. Read more
source§

impl<T> FutureExt for T

source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
§

impl<T> FutureExt for T
where T: Future + ?Sized,

§

fn delay(self, dur: Duration) -> DelayFuture<Self>
where Self: Sized,

Returns a Future that delays execution for a specified time. Read more
§

fn flatten(self) -> FlattenFuture<Self, <Self::Output as IntoFuture>::Future>
where Self: Sized, Self::Output: IntoFuture,

Flatten out the execution of this future when the result itself can be converted into another future. Read more
§

fn race<F>(self, other: F) -> Race<Self, F>
where Self: Future + Sized, F: Future<Output = Self::Output>,

Waits for one of two similarly-typed futures to complete. Read more
§

fn try_race<F, T, E>(self, other: F) -> TryRace<Self, F>
where Self: Future<Output = Result<T, E>> + Sized, F: Future<Output = Self::Output>,

Waits for one of two similarly-typed fallible futures to complete. Read more
§

fn join<F>(self, other: F) -> Join<Self, F>
where Self: Future + Sized, F: Future,

Waits for two similarly-typed futures to complete. Read more
§

fn try_join<F, A, B, E>(self, other: F) -> TryJoin<Self, F>
where Self: Future<Output = Result<A, E>> + Sized, F: Future<Output = Result<B, E>>,

Waits for two similarly-typed fallible futures to complete. Read more
§

fn timeout(self, dur: Duration) -> TimeoutFuture<Self>
where Self: Sized,

Waits for both the future and a timeout, if the timeout completes before the future, it returns a TimeoutError. Read more
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<F> IntoFuture for F
where F: Future,

§

type Output = <F as Future>::Output

The output that the future will produce on completion.
§

type IntoFuture = F

Which kind of future are we turning this into?
source§

fn into_future(self) -> <F as IntoFuture>::IntoFuture

Creates a future from a value. Read more
§

impl<T> IntoFuture for T
where T: Future,

§

type Output = <T as Future>::Output

The type of value produced on completion.
§

type Future = T

Which kind of future are we turning this into?
§

fn into_future(self) -> <T as IntoFuture>::Future

Create a future from a value
source§

impl<T> IntoMustBoxFuture for T
where T: Future + ?Sized,

source§

fn must_box<'a>(self) -> MustBoxFuture<'a, Self::Output>
where Self: 'a + Sized + Send,

Convert this raw future into a MustBoxFuture
§

impl<T> LayoutRaw for T

§

fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>

Gets the layout of the type.
§

impl<T> Pointable for T

§

const ALIGN: usize = _

The alignment of pointer.
§

type Init = T

The type for initializers.
§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
§

impl<T> Pointee for T

§

type Metadata = ()

The type for metadata in pointers and references to Self.
source§

impl<T> Same for T

§

type Output = T

Should always be Self
§

impl<SS, SP> SupersetOf<SS> for SP
where SS: SubsetOf<SP>,

§

fn to_subset(&self) -> Option<SS>

The inverse inclusion map: attempts to construct self from the equivalent element of its superset. Read more
§

fn is_in_subset(&self) -> bool

Checks if self is actually part of its subset T (and can be converted to it).
§

fn to_subset_unchecked(&self) -> SS

Use with care! Same as self.to_subset but without any property checks. Always succeeds.
§

fn from_subset(element: &SS) -> SP

The inclusion map: converts self to the equivalent element of its superset.
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more