Struct tokio::task::LocalSet[][src]

pub struct LocalSet { /* fields omitted */ }
This is supported on crate feature rt only.

A set of tasks which are executed on the same thread.

In some cases, it is necessary to run one or more futures that do not implement Send and thus are unsafe to send between threads. In these cases, a local task set may be used to schedule one or more !Send futures to run together on the same thread.

For example, the following code will not compile:

use std::rc::Rc;

#[tokio::main]
async fn main() {
    // `Rc` does not implement `Send`, and thus may not be sent between
    // threads safely.
    let unsend_data = Rc::new("my unsend data...");

    let unsend_data = unsend_data.clone();
    // Because the `async` block here moves `unsend_data`, the future is `!Send`.
    // Since `tokio::spawn` requires the spawned future to implement `Send`, this
    // will not compile.
    tokio::spawn(async move {
        println!("{}", unsend_data);
        // ...
    }).await.unwrap();
}

Use with run_until

To spawn !Send futures, we can use a local task set to schedule them on the thread calling Runtime::block_on. When running inside of the local task set, we can use task::spawn_local, which can spawn !Send futures. For example:

use std::rc::Rc;
use tokio::task;

#[tokio::main]
async fn main() {
    let unsend_data = Rc::new("my unsend data...");

    // Construct a local task set that can run `!Send` futures.
    let local = task::LocalSet::new();

    // Run the local task set.
    local.run_until(async move {
        let unsend_data = unsend_data.clone();
        // `spawn_local` ensures that the future is spawned on the local
        // task set.
        task::spawn_local(async move {
            println!("{}", unsend_data);
            // ...
        }).await.unwrap();
    }).await;
}

Note: The run_until method can only be used in #[tokio::main], #[tokio::test] or directly inside a call to Runtime::block_on. It cannot be used inside a task spawned with tokio::spawn.

Awaiting a LocalSet

Additionally, a LocalSet itself implements Future, completing when all tasks spawned on the LocalSet complete. This can be used to run several futures on a LocalSet and drive the whole set until they complete. For example,

use tokio::{task, time};
use std::rc::Rc;

#[tokio::main]
async fn main() {
    let unsend_data = Rc::new("world");
    let local = task::LocalSet::new();

    let unsend_data2 = unsend_data.clone();
    local.spawn_local(async move {
        // ...
        println!("hello {}", unsend_data2)
    });

    local.spawn_local(async move {
        time::sleep(time::Duration::from_millis(100)).await;
        println!("goodbye {}", unsend_data)
    });

    // ...

    local.await;
}

Note: Awaiting a LocalSet can only be done inside #[tokio::main], #[tokio::test] or directly inside a call to Runtime::block_on. It cannot be used inside a task spawned with tokio::spawn.

Use inside tokio::spawn

The two methods mentioned above cannot be used inside tokio::spawn, so to spawn !Send futures from inside tokio::spawn, we need to do something else. The solution is to create the LocalSet somewhere else, and communicate with it using an mpsc channel.

The following example puts the LocalSet inside a new thread.

use tokio::runtime::Builder;
use tokio::sync::{mpsc, oneshot};
use tokio::task::LocalSet;

// This struct describes the task you want to spawn. Here we include
// some simple examples. The oneshot channel allows sending a response
// to the spawner.
#[derive(Debug)]
enum Task {
    PrintNumber(u32),
    AddOne(u32, oneshot::Sender<u32>),
}

#[derive(Clone)]
struct LocalSpawner {
   send: mpsc::UnboundedSender<Task>,
}

impl LocalSpawner {
    pub fn new() -> Self {
        let (send, mut recv) = mpsc::unbounded_channel();

        let rt = Builder::new_current_thread()
            .enable_all()
            .build()
            .unwrap();

        std::thread::spawn(move || {
            let local = LocalSet::new();

            local.spawn_local(async move {
                while let Some(new_task) = recv.recv().await {
                    tokio::task::spawn_local(run_task(new_task));
                }
                // If the while loop returns, then all the LocalSpawner
                // objects have have been dropped.
            });

            // This will return once all senders are dropped and all
            // spawned tasks have returned.
            rt.block_on(local);
        });

        Self {
            send,
        }
    }

    pub fn spawn(&self, task: Task) {
        self.send.send(task).expect("Thread with LocalSet has shut down.");
    }
}

// This task may do !Send stuff. We use printing a number as an example,
// but it could be anything.
//
// The Task struct is an enum to support spawning many different kinds
// of operations.
async fn run_task(task: Task) {
    match task {
        Task::PrintNumber(n) => {
            println!("{}", n);
        },
        Task::AddOne(n, response) => {
            // We ignore failures to send the response.
            let _ = response.send(n + 1);
        },
    }
}

#[tokio::main]
async fn main() {
    let spawner = LocalSpawner::new();

    let (send, response) = oneshot::channel();
    spawner.spawn(Task::AddOne(10, send));
    let eleven = response.await.unwrap();
    assert_eq!(eleven, 11);
}

Implementations

impl LocalSet[src]

pub fn new() -> LocalSet

Notable traits for LocalSet

impl Future for LocalSet type Output = ();
[src]

Returns a new local task set.

pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>

Notable traits for JoinHandle<T>

impl<T> Future for JoinHandle<T> type Output = Result<T, JoinError>;
where
    F: Future + 'static,
    F::Output: 'static, 
[src]

Spawns a !Send task onto the local task set.

This task is guaranteed to be run on the current thread.

Unlike the free function spawn_local, this method may be used to spawn local tasks when the task set is not running. For example:

use tokio::task;

#[tokio::main]
async fn main() {
    let local = task::LocalSet::new();

    // Spawn a future on the local set. This future will be run when
    // we call `run_until` to drive the task set.
    local.spawn_local(async {
       // ...
    });

    // Run the local task set.
    local.run_until(async move {
        // ...
    }).await;

    // When `run` finishes, we can spawn _more_ futures, which will
    // run in subsequent calls to `run_until`.
    local.spawn_local(async {
       // ...
    });

    local.run_until(async move {
        // ...
    }).await;
}

pub fn block_on<F>(&self, rt: &Runtime, future: F) -> F::Output where
    F: Future
[src]

Runs a future to completion on the provided runtime, driving any local futures spawned on this task set on the current thread.

This runs the given future on the runtime, blocking until it is complete, and yielding its resolved result. Any tasks or timers which the future spawns internally will be executed on the runtime. The future may also call spawn_local to spawn_local additional local futures on the current thread.

This method should not be called from an asynchronous context.

Panics

This function panics if the executor is at capacity, if the provided future panics, or if called within an asynchronous execution context.

Notes

Since this function internally calls Runtime::block_on, and drives futures in the local task set inside that call to block_on, the local futures may not use in-place blocking. If a blocking call needs to be issued from a local task, the spawn_blocking API may be used instead.

For example, this will panic:

use tokio::runtime::Runtime;
use tokio::task;

let rt  = Runtime::new().unwrap();
let local = task::LocalSet::new();
local.block_on(&rt, async {
    let join = task::spawn_local(async {
        let blocking_result = task::block_in_place(|| {
            // ...
        });
        // ...
    });
    join.await.unwrap();
})

This, however, will not panic:

use tokio::runtime::Runtime;
use tokio::task;

let rt  = Runtime::new().unwrap();
let local = task::LocalSet::new();
local.block_on(&rt, async {
    let join = task::spawn_local(async {
        let blocking_result = task::spawn_blocking(|| {
            // ...
        }).await;
        // ...
    });
    join.await.unwrap();
})

pub async fn run_until<F>(&self, future: F) -> F::Output where
    F: Future
[src]

Run a future to completion on the local set, returning its output.

This returns a future that runs the given future with a local set, allowing it to call spawn_local to spawn additional !Send futures. Any local futures spawned on the local set will be driven in the background until the future passed to run_until completes. When the future passed to run finishes, any local futures which have not completed will remain on the local set, and will be driven on subsequent calls to run_until or when awaiting the local set itself.

Examples

use tokio::task;

#[tokio::main]
async fn main() {
    task::LocalSet::new().run_until(async {
        task::spawn_local(async move {
            // ...
        }).await.unwrap();
        // ...
    }).await;
}

Trait Implementations

impl Debug for LocalSet[src]

impl Default for LocalSet[src]

impl Drop for LocalSet[src]

impl Future for LocalSet[src]

type Output = ()

The type of value produced on completion.

Auto Trait Implementations

impl !RefUnwindSafe for LocalSet

impl !Send for LocalSet

impl !Sync for LocalSet

impl Unpin for LocalSet

impl !UnwindSafe for LocalSet

Blanket Implementations

impl<T> Any for T where
    T: 'static + ?Sized
[src]

impl<T> Borrow<T> for T where
    T: ?Sized
[src]

impl<T> BorrowMut<T> for T where
    T: ?Sized
[src]

impl<T> From<T> for T[src]

impl<T, U> Into<U> for T where
    U: From<T>, 
[src]

impl<F> IntoFuture for F where
    F: Future
[src]

type Output = <F as Future>::Output

🔬 This is a nightly-only experimental API. (into_future)

The output that the future will produce on completion.

type Future = F

🔬 This is a nightly-only experimental API. (into_future)

Which kind of future are we turning this into?

impl<T, U> TryFrom<U> for T where
    U: Into<T>, 
[src]

type Error = Infallible

The type returned in the event of a conversion error.

impl<T, U> TryInto<U> for T where
    U: TryFrom<T>, 
[src]

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.