sansio_executor/
tokio.rs

1//! Tokio-based local executor implementation
2//!
3//! This module provides a local executor implementation using tokio's `LocalSet`.
4
5use core_affinity::{CoreId, set_for_current};
6use scoped_tls::scoped_thread_local;
7use std::{
8    future::Future,
9    io::Result,
10    pin::Pin,
11    task::{Context, Poll},
12    thread::{self, JoinHandle},
13};
14use tokio::task::LocalSet;
15
16scoped_thread_local!(pub(super) static LOCAL: LocalSet);
17
18/// A handle to a spawned task.
19///
20/// This is a wrapper around tokio's `JoinHandle` that provides a consistent API.
21///
22/// When awaited, returns `Result<T, TaskError>`:
23/// - `Ok(T)`: The task completed successfully
24/// - `Err(TaskError)`: The task panicked or was aborted/cancelled
25///
26/// # Example
27///
28/// ```rust,no_run
29/// use sansio_executor::{LocalExecutorBuilder, spawn_local};
30///
31/// LocalExecutorBuilder::default().run(async {
32///     let task = spawn_local(async { 42 });
33///     let result = task.await.unwrap();
34///     assert_eq!(result, 42);
35/// });
36/// ```
37pub struct Task<T> {
38    inner: tokio::task::JoinHandle<T>,
39}
40
41impl<T> Future for Task<T> {
42    type Output = std::result::Result<T, TaskError>;
43
44    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
45        Pin::new(&mut self.inner)
46            .poll(cx)
47            .map(|result| result.map_err(|e| TaskError { inner: e }))
48    }
49}
50
51impl<T> Task<T> {
52    /// Detaches the task, allowing it to run in the background.
53    ///
54    /// This consumes the task handle and allows the task to continue running
55    /// without being awaited. The task will run to completion in the background.
56    ///
57    /// # Example
58    ///
59    /// ```rust,no_run
60    /// use sansio_executor::{LocalExecutorBuilder, spawn_local};
61    ///
62    /// LocalExecutorBuilder::default().run(async {
63    ///     let task = spawn_local(async {
64    ///         println!("Running in background");
65    ///     });
66    ///
67    ///     // Detach the task - it continues running
68    ///     task.detach();
69    ///
70    ///     // We can't await it anymore, but it will complete
71    /// });
72    /// ```
73    pub fn detach(self) {
74        // In tokio, dropping the JoinHandle allows the task to continue running
75        drop(self.inner);
76    }
77
78    /// Cancels the task.
79    ///
80    /// This immediately aborts the task's execution.
81    ///
82    /// # Example
83    ///
84    /// ```rust,no_run
85    /// use sansio_executor::{LocalExecutorBuilder, spawn_local};
86    ///
87    /// LocalExecutorBuilder::default().run(async {
88    ///     let task = spawn_local(async {
89    ///         println!("This will not print");
90    ///     });
91    ///
92    ///     // Cancel the task
93    ///     task.cancel();
94    /// });
95    /// ```
96    pub fn cancel(self) {
97        self.inner.abort();
98    }
99}
100
101/// Error returned when a spawned task fails.
102///
103/// This can occur when:
104/// - The task panics
105/// - The task is aborted/cancelled
106#[derive(Debug)]
107pub struct TaskError {
108    inner: tokio::task::JoinError,
109}
110
111impl std::fmt::Display for TaskError {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        write!(f, "{}", self.inner)
114    }
115}
116
117impl std::error::Error for TaskError {}
118
119/// A factory that can be used to configure and create a tokio [`LocalSet`].
120#[derive(Debug, Default)]
121pub struct LocalExecutorBuilder {
122    core_id: Option<CoreId>,
123    name: String,
124}
125
126impl LocalExecutorBuilder {
127    /// Creates a new LocalExecutorBuilder
128    pub fn new() -> Self {
129        Self::default()
130    }
131
132    /// Names the thread-to-be. Currently, the name is used for identification only in panic messages.
133    pub fn name(mut self, name: &str) -> Self {
134        self.name = String::from(name);
135        self
136    }
137
138    /// Pins the thread to the specified CPU core
139    pub fn core_id(mut self, core_id: CoreId) -> Self {
140        self.core_id = Some(core_id);
141        self
142    }
143
144    /// Runs the local executor on the current thread until the given future completes.
145    pub fn run<T>(mut self, f: impl Future<Output = T>) -> T {
146        if let Some(core_id) = self.core_id.take() {
147            set_for_current(core_id);
148        }
149
150        let rt = tokio::runtime::Builder::new_current_thread()
151            .enable_all()
152            .build()
153            .expect("Failed to build tokio runtime");
154
155        let local_set = LocalSet::new();
156        LOCAL.set(&local_set, || rt.block_on(local_set.run_until(f)))
157    }
158
159    /// Spawns a thread to run the local executor until the given future completes.
160    pub fn spawn<G, F, T>(mut self, fut_gen: G) -> Result<JoinHandle<T>>
161    where
162        G: FnOnce() -> F + Send + 'static,
163        F: Future<Output = T> + 'static,
164        T: Send + 'static,
165    {
166        let mut core_id = self.core_id.take();
167
168        thread::Builder::new().name(self.name).spawn(move || {
169            if let Some(core_id) = core_id.take() {
170                set_for_current(core_id);
171            }
172
173            let rt = tokio::runtime::Builder::new_current_thread()
174                .enable_all()
175                .build()
176                .expect("Failed to build tokio runtime");
177
178            let local_set = LocalSet::new();
179            LOCAL.set(&local_set, || rt.block_on(local_set.run_until(fut_gen())))
180        })
181    }
182}
183
184/// Spawns a task onto the current single-threaded executor.
185///
186/// If called from a tokio [`LocalSet`], the task is spawned on it.
187/// Otherwise, this method panics.
188///
189/// Returns a [`Task<T>`] that implements `Future<Output = Result<T, TaskError>>`.
190/// The task can be awaited to retrieve its result.
191///
192/// # Panics
193///
194/// Panics if called outside of a `LocalSet` context.
195///
196/// # Example
197///
198/// ```rust,no_run
199/// use sansio_executor::{LocalExecutorBuilder, spawn_local};
200///
201/// LocalExecutorBuilder::default().run(async {
202///     let task1 = spawn_local(async { 1 + 1 });
203///     let task2 = spawn_local(async { 2 + 2 });
204///
205///     let result1 = task1.await.unwrap();
206///     let result2 = task2.await.unwrap();
207///
208///     println!("Results: {}, {}", result1, result2);
209/// });
210/// ```
211pub fn spawn_local<T: 'static>(future: impl Future<Output = T> + 'static) -> Task<T> {
212    if LOCAL.is_set() {
213        LOCAL.with(|local_set| Task {
214            inner: local_set.spawn_local(future),
215        })
216    } else {
217        panic!("`spawn_local()` must be called from a tokio `LocalSet`")
218    }
219}
220
221/// Yields to allow other tasks in the same executor to run.
222///
223/// This is an async function. Call it as `yield_local().await`.
224///
225/// This function yields execution to allow other tasks in the same LocalSet to run.
226///
227/// # Example
228///
229/// ```rust,no_run
230/// use sansio_executor::{LocalExecutorBuilder, spawn_local, yield_local};
231///
232/// LocalExecutorBuilder::default().run(async {
233///     spawn_local(async {
234///         println!("Task 1 starting");
235///         yield_local().await;  // Let other tasks run
236///         println!("Task 1 resuming");
237///     });
238///
239///     spawn_local(async {
240///         println!("Task 2 running");
241///     });
242/// });
243/// ```
244pub async fn yield_local() {
245    tokio::task::yield_now().await
246}