sansio_executor/tokio.rs
1//! Tokio-based local executor implementation
2//!
3//! This module provides a local executor implementation using tokio's `LocalSet`.
4
5use core_affinity::{CoreId, set_for_current};
6use scoped_tls::scoped_thread_local;
7use std::{
8 future::Future,
9 io::Result,
10 pin::Pin,
11 task::{Context, Poll},
12 thread::{self, JoinHandle},
13};
14use tokio::task::LocalSet;
15
16scoped_thread_local!(pub(super) static LOCAL: LocalSet);
17
18/// A handle to a spawned task.
19///
20/// This is a wrapper around tokio's `JoinHandle` that provides a consistent API.
21///
22/// When awaited, returns `Result<T, TaskError>`:
23/// - `Ok(T)`: The task completed successfully
24/// - `Err(TaskError)`: The task panicked or was aborted/cancelled
25///
26/// # Example
27///
28/// ```rust,no_run
29/// use sansio_executor::{LocalExecutorBuilder, spawn_local};
30///
31/// LocalExecutorBuilder::default().run(async {
32/// let task = spawn_local(async { 42 });
33/// let result = task.await.unwrap();
34/// assert_eq!(result, 42);
35/// });
36/// ```
37pub struct Task<T> {
38 inner: tokio::task::JoinHandle<T>,
39}
40
41impl<T> Future for Task<T> {
42 type Output = std::result::Result<T, TaskError>;
43
44 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
45 Pin::new(&mut self.inner)
46 .poll(cx)
47 .map(|result| result.map_err(|e| TaskError { inner: e }))
48 }
49}
50
51impl<T> Task<T> {
52 /// Detaches the task, allowing it to run in the background.
53 ///
54 /// This consumes the task handle and allows the task to continue running
55 /// without being awaited. The task will run to completion in the background.
56 ///
57 /// # Example
58 ///
59 /// ```rust,no_run
60 /// use sansio_executor::{LocalExecutorBuilder, spawn_local};
61 ///
62 /// LocalExecutorBuilder::default().run(async {
63 /// let task = spawn_local(async {
64 /// println!("Running in background");
65 /// });
66 ///
67 /// // Detach the task - it continues running
68 /// task.detach();
69 ///
70 /// // We can't await it anymore, but it will complete
71 /// });
72 /// ```
73 pub fn detach(self) {
74 // In tokio, dropping the JoinHandle allows the task to continue running
75 drop(self.inner);
76 }
77
78 /// Cancels the task.
79 ///
80 /// This immediately aborts the task's execution.
81 ///
82 /// # Example
83 ///
84 /// ```rust,no_run
85 /// use sansio_executor::{LocalExecutorBuilder, spawn_local};
86 ///
87 /// LocalExecutorBuilder::default().run(async {
88 /// let task = spawn_local(async {
89 /// println!("This will not print");
90 /// });
91 ///
92 /// // Cancel the task
93 /// task.cancel();
94 /// });
95 /// ```
96 pub fn cancel(self) {
97 self.inner.abort();
98 }
99}
100
101/// Error returned when a spawned task fails.
102///
103/// This can occur when:
104/// - The task panics
105/// - The task is aborted/cancelled
106#[derive(Debug)]
107pub struct TaskError {
108 inner: tokio::task::JoinError,
109}
110
111impl std::fmt::Display for TaskError {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 write!(f, "{}", self.inner)
114 }
115}
116
117impl std::error::Error for TaskError {}
118
119/// A factory that can be used to configure and create a tokio [`LocalSet`].
120#[derive(Debug, Default)]
121pub struct LocalExecutorBuilder {
122 core_id: Option<CoreId>,
123 name: String,
124}
125
126impl LocalExecutorBuilder {
127 /// Creates a new LocalExecutorBuilder
128 pub fn new() -> Self {
129 Self::default()
130 }
131
132 /// Names the thread-to-be. Currently, the name is used for identification only in panic messages.
133 pub fn name(mut self, name: &str) -> Self {
134 self.name = String::from(name);
135 self
136 }
137
138 /// Pins the thread to the specified CPU core
139 pub fn core_id(mut self, core_id: CoreId) -> Self {
140 self.core_id = Some(core_id);
141 self
142 }
143
144 /// Runs the local executor on the current thread until the given future completes.
145 pub fn run<T>(mut self, f: impl Future<Output = T>) -> T {
146 if let Some(core_id) = self.core_id.take() {
147 set_for_current(core_id);
148 }
149
150 let rt = tokio::runtime::Builder::new_current_thread()
151 .enable_all()
152 .build()
153 .expect("Failed to build tokio runtime");
154
155 let local_set = LocalSet::new();
156 LOCAL.set(&local_set, || rt.block_on(local_set.run_until(f)))
157 }
158
159 /// Spawns a thread to run the local executor until the given future completes.
160 pub fn spawn<G, F, T>(mut self, fut_gen: G) -> Result<JoinHandle<T>>
161 where
162 G: FnOnce() -> F + Send + 'static,
163 F: Future<Output = T> + 'static,
164 T: Send + 'static,
165 {
166 let mut core_id = self.core_id.take();
167
168 thread::Builder::new().name(self.name).spawn(move || {
169 if let Some(core_id) = core_id.take() {
170 set_for_current(core_id);
171 }
172
173 let rt = tokio::runtime::Builder::new_current_thread()
174 .enable_all()
175 .build()
176 .expect("Failed to build tokio runtime");
177
178 let local_set = LocalSet::new();
179 LOCAL.set(&local_set, || rt.block_on(local_set.run_until(fut_gen())))
180 })
181 }
182}
183
184/// Spawns a task onto the current single-threaded executor.
185///
186/// If called from a tokio [`LocalSet`], the task is spawned on it.
187/// Otherwise, this method panics.
188///
189/// Returns a [`Task<T>`] that implements `Future<Output = Result<T, TaskError>>`.
190/// The task can be awaited to retrieve its result.
191///
192/// # Panics
193///
194/// Panics if called outside of a `LocalSet` context.
195///
196/// # Example
197///
198/// ```rust,no_run
199/// use sansio_executor::{LocalExecutorBuilder, spawn_local};
200///
201/// LocalExecutorBuilder::default().run(async {
202/// let task1 = spawn_local(async { 1 + 1 });
203/// let task2 = spawn_local(async { 2 + 2 });
204///
205/// let result1 = task1.await.unwrap();
206/// let result2 = task2.await.unwrap();
207///
208/// println!("Results: {}, {}", result1, result2);
209/// });
210/// ```
211pub fn spawn_local<T: 'static>(future: impl Future<Output = T> + 'static) -> Task<T> {
212 if LOCAL.is_set() {
213 LOCAL.with(|local_set| Task {
214 inner: local_set.spawn_local(future),
215 })
216 } else {
217 panic!("`spawn_local()` must be called from a tokio `LocalSet`")
218 }
219}
220
221/// Yields to allow other tasks in the same executor to run.
222///
223/// This is an async function. Call it as `yield_local().await`.
224///
225/// This function yields execution to allow other tasks in the same LocalSet to run.
226///
227/// # Example
228///
229/// ```rust,no_run
230/// use sansio_executor::{LocalExecutorBuilder, spawn_local, yield_local};
231///
232/// LocalExecutorBuilder::default().run(async {
233/// spawn_local(async {
234/// println!("Task 1 starting");
235/// yield_local().await; // Let other tasks run
236/// println!("Task 1 resuming");
237/// });
238///
239/// spawn_local(async {
240/// println!("Task 2 running");
241/// });
242/// });
243/// ```
244pub async fn yield_local() {
245 tokio::task::yield_now().await
246}