glommio/executor/mod.rs
1// Unless explicitly stated otherwise all files in this repository are licensed
2// under the MIT/Apache-2.0 License, at your convenience
3//
4// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2020 Datadog, Inc.
5//
6//! Async executor.
7//!
8//! This crate offers two kinds of executors: single-threaded and
9//! multi-threaded.
10//!
11//! # Examples
12//!
13//! Run four single-threaded executors concurrently:
14//!
15//! ```
16//! use glommio::{
17//! timer::Timer,
18//! LocalExecutor,
19//! LocalExecutorBuilder,
20//! LocalExecutorPoolBuilder,
21//! PoolPlacement,
22//! };
23//!
24//! LocalExecutorPoolBuilder::new(PoolPlacement::Unbound(4))
25//! .on_all_shards(move || async {
26//! Timer::new(std::time::Duration::from_millis(100)).await;
27//! println!("Hello world!");
28//! })
29//! .expect("failed to spawn local executors")
30//! .join_all();
31//! ```
32
33#![warn(missing_docs, missing_debug_implementations)]
34
35use crate::{
36 error::BuilderErrorKind,
37 executor::stall::StallDetector,
38 io::DmaBuffer,
39 parking, reactor,
40 sys::{self, blocking::BlockingThreadPool},
41 task::{self, waker_fn::dummy_waker},
42 GlommioError, IoRequirements, IoStats, Latency, Reactor, Shares,
43};
44use ahash::AHashMap;
45use futures_lite::pin;
46use latch::{Latch, LatchState};
47use log::warn;
48pub use placement::{CpuSet, Placement, PoolPlacement};
49use std::{
50 cell::RefCell,
51 collections::{hash_map::Entry, BinaryHeap},
52 fmt,
53 future::Future,
54 io,
55 marker::PhantomData,
56 mem::MaybeUninit,
57 ops::{Deref, DerefMut},
58 pin::Pin,
59 rc::Rc,
60 sync::{Arc, Mutex},
61 task::{Context, Poll},
62 thread::{Builder, JoinHandle},
63 time::{Duration, Instant},
64};
65use tracing::trace;
66
67mod latch;
68mod multitask;
69mod placement;
70pub mod stall;
71
72pub(crate) const DEFAULT_EXECUTOR_NAME: &str = "unnamed";
73pub(crate) const DEFAULT_PREEMPT_TIMER: Duration = Duration::from_millis(100);
74pub(crate) const DEFAULT_IO_MEMORY: usize = 10 << 20;
75pub(crate) const DEFAULT_RING_SUBMISSION_DEPTH: usize = 128;
76
77/// Result type alias that removes the need to specify a type parameter
78/// that's only valid in the channel variants of the error. Otherwise, it
79/// might be confused with the error (`E`) that a result usually has in
80/// the second type parameter.
81type Result<T> = crate::Result<T, ()>;
82
83#[cfg(feature = "native-tls")]
84#[thread_local]
85static mut LOCAL_EX: *const LocalExecutor = std::ptr::null();
86
87#[cfg(not(feature = "native-tls"))]
88scoped_tls::scoped_thread_local!(static LOCAL_EX: LocalExecutor);
89
90/// Returns a proxy struct to the [`LocalExecutor`]
91#[inline(always)]
92pub fn executor() -> ExecutorProxy {
93 ExecutorProxy {}
94}
95
96pub(crate) fn executor_id() -> Option<usize> {
97 #[cfg(not(feature = "native-tls"))]
98 {
99 if LOCAL_EX.is_set() {
100 Some(LOCAL_EX.with(|ex| ex.id))
101 } else {
102 None
103 }
104 }
105
106 #[cfg(feature = "native-tls")]
107 unsafe {
108 LOCAL_EX.as_ref().map(|ex| ex.id)
109 }
110}
111
112#[derive(Default, Debug, Copy, Clone, Eq, PartialEq, Hash)]
113/// An opaque handle indicating in which queue a group of tasks will execute.
114/// Tasks in the same group will execute in FIFO order but no guarantee is made
115/// about ordering on different task queues.
116pub struct TaskQueueHandle {
117 index: usize,
118}
119
120impl TaskQueueHandle {
121 /// Returns a numeric ID that uniquely identifies this Task queue
122 pub fn index(&self) -> usize {
123 self.index
124 }
125}
126
127#[derive(Debug)]
128pub(crate) struct TaskQueue {
129 pub(crate) ex: Rc<multitask::LocalExecutor>,
130 active: bool,
131 shares: Shares,
132 vruntime: u64,
133 io_requirements: IoRequirements,
134 name: String,
135 last_adjustment: Instant,
136 // for dynamic shares classes
137 yielded: bool,
138 stats: TaskQueueStats,
139}
140
141// Impl a custom order so we use a min-heap
142impl Ord for TaskQueue {
143 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
144 other.vruntime.cmp(&self.vruntime)
145 }
146}
147
148impl PartialOrd for TaskQueue {
149 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
150 Some(other.vruntime.cmp(&self.vruntime))
151 }
152}
153
154impl PartialEq for TaskQueue {
155 fn eq(&self, other: &Self) -> bool {
156 self.vruntime == other.vruntime
157 }
158}
159
160impl Eq for TaskQueue {}
161
162impl TaskQueue {
163 fn new<S>(
164 index: TaskQueueHandle,
165 name: S,
166 shares: Shares,
167 ioreq: IoRequirements,
168 ) -> Rc<RefCell<Self>>
169 where
170 S: Into<String>,
171 {
172 Rc::new(RefCell::new(TaskQueue {
173 ex: Rc::new(multitask::LocalExecutor::new()),
174 active: false,
175 stats: TaskQueueStats::new(index, shares.reciprocal_shares()),
176 shares,
177 vruntime: 0,
178 io_requirements: ioreq,
179 name: name.into(),
180 last_adjustment: Instant::now(),
181 yielded: false,
182 }))
183 }
184
185 fn is_active(&self) -> bool {
186 self.active
187 }
188
189 fn get_task(&mut self) -> Option<multitask::Runnable> {
190 self.ex.get_task()
191 }
192
193 fn yielded(&self) -> bool {
194 self.yielded
195 }
196
197 fn prepare_to_run(&mut self, now: Instant) {
198 self.yielded = false;
199 if let Shares::Dynamic(bm) = &self.shares {
200 if now.saturating_duration_since(self.last_adjustment) > bm.adjustment_period() {
201 self.last_adjustment = now;
202 self.stats.reciprocal_shares = self.shares.reciprocal_shares();
203 }
204 }
205 }
206
207 fn account_vruntime(&mut self, delta: Duration) -> Option<u64> {
208 let delta_scaled = (self.stats.reciprocal_shares * (delta.as_nanos() as u64)) >> 12;
209 self.stats.runtime += delta;
210 self.stats.queue_selected += 1;
211 self.active = self.ex.is_active();
212
213 let vruntime = self.vruntime.checked_add(delta_scaled);
214 if let Some(x) = vruntime {
215 self.vruntime = x;
216 }
217 vruntime
218 }
219}
220
221pub(crate) fn bind_to_cpu_set(cpus: impl IntoIterator<Item = usize>) -> Result<()> {
222 let mut cpuset = nix::sched::CpuSet::new();
223 for cpu in cpus {
224 cpuset.set(cpu).map_err(|e| to_io_error!(e))?;
225 }
226 let pid = nix::unistd::Pid::from_raw(0);
227 nix::sched::sched_setaffinity(pid, &cpuset).map_err(|e| Into::into(to_io_error!(e)))
228}
229
230// Dealing with references would imply getting a Rc, RefCells, and all of that
231// Stats should be copied Infrequently, and if you have enough stats to fill a
232// Kb with data from a single source, maybe you should rethink your life
233// choices.
234#[derive(Debug, Copy, Clone, Default)]
235/// Allows information about the current state of this executor to be consumed
236/// by applications.
237pub struct ExecutorStats {
238 executor_runtime: Duration,
239 // total_runtime include poll_io time, exclude spin loop time
240 total_runtime: Duration,
241 scheduler_runs: u64,
242 tasks_executed: u64,
243}
244
245impl ExecutorStats {
246 fn new() -> Self {
247 Self {
248 executor_runtime: Duration::from_nanos(0),
249 total_runtime: Duration::from_nanos(0),
250 scheduler_runs: 0,
251 tasks_executed: 0,
252 }
253 }
254
255 /// The total amount of runtime in this executor so far.
256 ///
257 /// This is especially important for spinning executors, since the amount of
258 /// CPU time you will see in the operating system will be a far cry from
259 /// the CPU time it actually spent executing. Sleeping or Spinning are
260 /// not accounted here
261 pub fn executor_runtime(&self) -> Duration {
262 self.executor_runtime
263 }
264
265 /// The total amount of runtime in this executor, plus poll io time
266 pub fn total_runtime(&self) -> Duration {
267 self.total_runtime
268 }
269
270 /// Returns the amount of times the scheduler loop was called. Glommio
271 /// scheduler selects a task queue to run and runs many tasks in that
272 /// task queue. This number corresponds to the amount of times was
273 /// called upon to select a new queue.
274 pub fn scheduler_runs(&self) -> u64 {
275 self.scheduler_runs
276 }
277
278 /// Returns the amount of tasks executed in the system, over all queues.
279 pub fn tasks_executed(&self) -> u64 {
280 self.tasks_executed
281 }
282}
283
284#[derive(Debug, Copy, Clone)]
285/// Allows information about the current state of a particular task queue to be
286/// consumed by applications.
287pub struct TaskQueueStats {
288 index: TaskQueueHandle,
289 // so we can easily produce a handle
290 reciprocal_shares: u64,
291 queue_selected: u64,
292 runtime: Duration,
293}
294
295impl TaskQueueStats {
296 fn new(index: TaskQueueHandle, reciprocal_shares: u64) -> Self {
297 Self {
298 index,
299 reciprocal_shares,
300 runtime: Duration::from_nanos(0),
301 queue_selected: 0,
302 }
303 }
304
305 /// Returns a numeric ID that uniquely identifies this Task queue
306 pub fn index(&self) -> TaskQueueHandle {
307 self.index
308 }
309
310 /// Returns the current number of shares in this task queue.
311 ///
312 /// If the task queue is configured to use static shares this will never
313 /// change. If the task queue is configured to use dynamic shares, this
314 /// returns a sample of the shares values the last time the scheduler
315 /// ran.
316 pub fn current_shares(&self) -> usize {
317 ((1u64 << 22) / self.reciprocal_shares) as usize
318 }
319
320 /// Returns the accumulated runtime this task queue had received since the
321 /// beginning of its execution
322 pub fn runtime(&self) -> Duration {
323 self.runtime
324 }
325
326 /// Returns the number of times this queue was selected to be executed. In
327 /// conjunction with the runtime, you can extract an average of the
328 /// amount of time this queue tends to run for
329 pub fn queue_selected(&self) -> u64 {
330 self.queue_selected
331 }
332
333 pub(crate) fn take(&mut self) -> Self {
334 std::mem::replace(
335 self,
336 Self {
337 index: self.index,
338 reciprocal_shares: self.reciprocal_shares,
339 queue_selected: Default::default(),
340 runtime: Default::default(),
341 },
342 )
343 }
344}
345
346#[derive(Debug)]
347struct ExecutorQueues {
348 active_executors: BinaryHeap<Rc<RefCell<TaskQueue>>>,
349 available_executors: AHashMap<usize, Rc<RefCell<TaskQueue>>>,
350 active_executing: Option<Rc<RefCell<TaskQueue>>>,
351 executor_index: usize,
352 default_vruntime: u64,
353 preempt_timer_duration: Duration,
354 default_preempt_timer_duration: Duration,
355 spin_before_park: Option<Duration>,
356 stats: ExecutorStats,
357}
358
359impl ExecutorQueues {
360 fn new(preempt_timer_duration: Duration, spin_before_park: Option<Duration>) -> Self {
361 ExecutorQueues {
362 active_executors: BinaryHeap::new(),
363 available_executors: AHashMap::new(),
364 active_executing: None,
365 executor_index: 1, // 0 is the default
366 default_vruntime: 0,
367 preempt_timer_duration,
368 default_preempt_timer_duration: preempt_timer_duration,
369 spin_before_park,
370 stats: ExecutorStats::new(),
371 }
372 }
373
374 fn reevaluate_preempt_timer(&mut self) {
375 self.preempt_timer_duration = self
376 .active_executors
377 .iter()
378 .map(|tq| match tq.borrow().io_requirements.latency_req {
379 Latency::NotImportant => self.default_preempt_timer_duration,
380 Latency::Matters(d) => d,
381 })
382 .min()
383 .unwrap_or(self.default_preempt_timer_duration)
384 }
385
386 fn maybe_activate(&mut self, queue: Rc<RefCell<TaskQueue>>) {
387 let mut state = queue.borrow_mut();
388 if !state.is_active() {
389 state.vruntime = self.default_vruntime + 1;
390 state.active = true;
391 drop(state);
392 self.active_executors.push(queue);
393 self.reevaluate_preempt_timer();
394 }
395 }
396}
397
398/// A wrapper around a [`std::thread::JoinHandle`]
399#[derive(Debug)]
400pub struct ExecutorJoinHandle<T: Send + 'static>(JoinHandle<Result<T>>);
401
402impl<T: Send + 'static> ExecutorJoinHandle<T> {
403 /// See [`std::thread::JoinHandle::thread()`]
404 #[must_use]
405 pub fn thread(&self) -> &std::thread::Thread {
406 self.0.thread()
407 }
408
409 /// See [`std::thread::JoinHandle::join()`]
410 pub fn join(self) -> Result<T> {
411 match self.0.join() {
412 Err(err) => Err(GlommioError::BuilderError(BuilderErrorKind::ThreadPanic(
413 err,
414 ))),
415 Ok(Err(err)) => Err(err),
416 Ok(Ok(res)) => Ok(res),
417 }
418 }
419}
420
421/// A factory that can be used to configure and create a [`LocalExecutor`].
422///
423/// Methods can be chained on it in order to configure it.
424///
425/// The [`spawn`] method will take ownership of the builder and create a
426/// `Result` to the [`LocalExecutor`] handle with the given configuration.
427///
428/// The [`LocalExecutor::default`] free function uses a Builder with default
429/// configuration and unwraps its return value.
430///
431/// You may want to use [`LocalExecutorBuilder::spawn`] instead of
432/// [`LocalExecutor::default`], when you want to recover from a failure to
433/// launch a thread. The [`LocalExecutor::default`] function will panic where
434/// the Builder method will return a `io::Result`.
435///
436/// # Examples
437///
438/// ```
439/// use glommio::LocalExecutorBuilder;
440///
441/// let builder = LocalExecutorBuilder::default();
442/// let ex = builder.make().unwrap();
443/// ```
444///
445/// [`LocalExecutor`]: struct.LocalExecutor.html
446///
447/// [`LocalExecutor::default`]: struct.LocalExecutor.html#method.default
448///
449/// [`LocalExecutorBuilder::spawn`]:
450/// struct.LocalExecutorBuilder.html#method.spawn
451///
452/// [`spawn`]: struct.LocalExecutorBuilder.html#method.spawn
453#[derive(Debug)]
454pub struct LocalExecutorBuilder {
455 /// The placement policy for the [`LocalExecutor`] to create
456 placement: Placement,
457 /// Spin for duration before parking a reactor
458 spin_before_park: Option<Duration>,
459 /// A name for the thread-to-be (if any), for identification in panic
460 /// messages
461 name: String,
462 /// Amount of memory to reserve for storage I/O. This will be preallocated
463 /// and registered with io_uring. It is still possible to use more than
464 /// that, but it will come from the standard allocator and performance
465 /// will suffer. Defaults to 10 MiB.
466 io_memory: usize,
467 /// The depth of the IO rings to create. This influences the level of IO
468 /// concurrency. A higher ring depth allows a shard to submit a
469 /// greater number of IO requests to the kernel at once.
470 ring_depth: usize,
471 /// How often to yield to other task queues
472 preempt_timer_duration: Duration,
473 /// Whether to record the latencies of individual IO requests
474 record_io_latencies: bool,
475 /// The placement policy of the blocking thread pool
476 /// Defaults to one thread using the same placement strategy as the host
477 /// executor
478 blocking_thread_pool_placement: PoolPlacement,
479 /// Whether to detect stalls in unyielding tasks.
480 /// [`stall::DefaultStallDetectionHandler`] installs a signal handler for
481 /// [`nix::libc::SIGUSR1`], so is disabled by default.
482 detect_stalls: Option<Box<dyn stall::StallDetectionHandler + 'static>>,
483}
484
485impl LocalExecutorBuilder {
486 /// Generates the base configuration for spawning a [`LocalExecutor`], from
487 /// which configuration methods can be chained.
488 /// The method's only argument is the [`Placement`] policy by which the
489 /// [`LocalExecutor`] is bound to the machine's hardware topology. i.e.
490 /// how many and which CPUs to use.
491 pub fn new(placement: Placement) -> LocalExecutorBuilder {
492 LocalExecutorBuilder {
493 placement: placement.clone(),
494 spin_before_park: None,
495 name: String::from(DEFAULT_EXECUTOR_NAME),
496 io_memory: DEFAULT_IO_MEMORY,
497 ring_depth: DEFAULT_RING_SUBMISSION_DEPTH,
498 preempt_timer_duration: DEFAULT_PREEMPT_TIMER,
499 record_io_latencies: false,
500 blocking_thread_pool_placement: PoolPlacement::from(placement),
501 detect_stalls: None,
502 }
503 }
504
505 /// Spin for duration before parking a reactor
506 #[must_use = "The builder must be built to be useful"]
507 pub fn spin_before_park(mut self, spin: Duration) -> LocalExecutorBuilder {
508 self.spin_before_park = Some(spin);
509 self
510 }
511
512 /// Names the thread-to-be. Currently, the name is used for identification
513 /// only in panic messages.
514 #[must_use = "The builder must be built to be useful"]
515 pub fn name(mut self, name: &str) -> LocalExecutorBuilder {
516 self.name = String::from(name);
517 self
518 }
519
520 /// Amount of memory to reserve for storage I/O. This will be preallocated
521 /// and registered with io_uring. It is still possible to use more than
522 /// that, but it will come from the standard allocator and performance
523 /// will suffer.
524 ///
525 /// The system will always try to allocate at least 64 kiB for I/O memory,
526 /// and the default is 10 MiB.
527 #[must_use = "The builder must be built to be useful"]
528 pub fn io_memory(mut self, io_memory: usize) -> LocalExecutorBuilder {
529 self.io_memory = io_memory;
530 self
531 }
532
533 /// The depth of the IO rings to create. This influences the level of IO
534 /// concurrency. A higher ring depth allows a shard to submit a
535 /// greater number of IO requests to the kernel at once.
536 ///
537 /// Values above zero are valid and the default is 128.
538 #[must_use = "The builder must be built to be useful"]
539 pub fn ring_depth(mut self, ring_depth: usize) -> LocalExecutorBuilder {
540 assert!(ring_depth > 0, "ring depth should be strictly positive");
541 self.ring_depth = ring_depth;
542 self
543 }
544
545 /// How often [`need_preempt`] will return true by default.
546 ///
547 /// Lower values mean task queues will switch execution more often, which
548 /// can help latency but harm throughput. When individual task queues
549 /// are present, this value can still be dynamically lowered through the
550 /// [`Latency`] setting.
551 ///
552 /// Default is 100ms.
553 ///
554 /// [`need_preempt`]: ExecutorProxy::need_preempt
555 /// [`Latency`]: crate::Latency
556 #[must_use = "The builder must be built to be useful"]
557 pub fn preempt_timer(mut self, dur: Duration) -> LocalExecutorBuilder {
558 self.preempt_timer_duration = dur;
559 self
560 }
561
562 /// Whether to record the latencies of individual IO requests as part of the
563 /// IO stats. Recording latency can be expensive. Disabled by default.
564 #[must_use = "The builder must be built to be useful"]
565 pub fn record_io_latencies(mut self, enabled: bool) -> LocalExecutorBuilder {
566 self.record_io_latencies = enabled;
567 self
568 }
569
570 /// The placement policy of the blocking thread pool.
571 /// Defaults to one thread using the same placement strategy as the host
572 /// executor.
573 #[must_use = "The builder must be built to be useful"]
574 pub fn blocking_thread_pool_placement(
575 mut self,
576 placement: PoolPlacement,
577 ) -> LocalExecutorBuilder {
578 self.blocking_thread_pool_placement = placement;
579 self
580 }
581
582 /// Whether to detect stalls in unyielding tasks.
583 /// [`stall::DefaultStallDetectionHandler`] installs a signal handler for
584 /// [`nix::libc::SIGUSR1`], so is disabled by default.
585 /// # Examples
586 ///
587 /// ```
588 /// use glommio::{DefaultStallDetectionHandler, LocalExecutorBuilder};
589 ///
590 /// let local_ex = LocalExecutorBuilder::default()
591 /// .detect_stalls(Some(Box::new(DefaultStallDetectionHandler {})))
592 /// .make()
593 /// .unwrap();
594 /// ```
595 #[must_use = "The builder must be built to be useful"]
596 pub fn detect_stalls(
597 mut self,
598 handler: Option<Box<dyn stall::StallDetectionHandler + 'static>>,
599 ) -> Self {
600 self.detect_stalls = handler;
601 self
602 }
603
604 /// Make a new [`LocalExecutor`] by taking ownership of the Builder, and
605 /// returns a [`Result`](crate::Result) to the executor.
606 /// # Examples
607 ///
608 /// ```
609 /// use glommio::LocalExecutorBuilder;
610 ///
611 /// let local_ex = LocalExecutorBuilder::default().make().unwrap();
612 /// ```
613 pub fn make(self) -> Result<LocalExecutor> {
614 let notifier = sys::new_sleep_notifier()?;
615 let mut cpu_set_gen = placement::CpuSetGenerator::one(self.placement)?;
616 let mut le = LocalExecutor::new(
617 notifier,
618 cpu_set_gen.next().cpu_binding(),
619 LocalExecutorConfig {
620 io_memory: self.io_memory,
621 ring_depth: self.ring_depth,
622 preempt_timer: self.preempt_timer_duration,
623 record_io_latencies: self.record_io_latencies,
624 spin_before_park: self.spin_before_park,
625 thread_pool_placement: self.blocking_thread_pool_placement,
626 detect_stalls: self.detect_stalls,
627 },
628 )?;
629 le.init();
630 Ok(le)
631 }
632
633 /// Spawn a new [`LocalExecutor`] in a new thread with a given task.
634 ///
635 /// This `spawn` function is an ergonomic shortcut for calling
636 /// `std::thread::spawn`, [`LocalExecutorBuilder::make`] in the spawned
637 /// thread, and then [`LocalExecutor::run`]. This `spawn` function takes
638 /// ownership of a [`LocalExecutorBuilder`] with the configuration for
639 /// the [`LocalExecutor`], spawns that executor in a new thread, and starts
640 /// the task given by `fut_gen()` in that thread.
641 ///
642 /// The indirection of `fut_gen()` here (instead of taking a `Future`)
643 /// allows for futures that may not be `Send`-able once started. As this
644 /// executor is thread-local it can guarantee that the futures will not
645 /// be Sent once started.
646 ///
647 /// # Panics
648 ///
649 /// The newly spawned thread panics if creating the executor fails. If you
650 /// need more fine-grained error handling consider initializing those
651 /// entities manually.
652 ///
653 /// # Example
654 ///
655 /// ```
656 /// use glommio::LocalExecutorBuilder;
657 ///
658 /// let handle = LocalExecutorBuilder::default()
659 /// .spawn(|| async move {
660 /// println!("hello");
661 /// })
662 /// .unwrap();
663 ///
664 /// handle.join().unwrap();
665 /// ```
666 ///
667 /// [`LocalExecutor`]: struct.LocalExecutor.html
668 ///
669 /// [`LocalExecutorBuilder`]: struct.LocalExecutorBuilder.html
670 ///
671 /// [`LocalExecutorBuilder::make`]:
672 /// struct.LocalExecutorBuilder.html#method.make
673 ///
674 /// [`LocalExecutor::run`]:struct.LocalExecutor.html#method.run
675 #[must_use = "This spawns an executor on a thread, so you may need to call \
676 `JoinHandle::join()` to keep the main thread alive"]
677 pub fn spawn<G, F, T>(self, fut_gen: G) -> Result<ExecutorJoinHandle<T>>
678 where
679 G: FnOnce() -> F + Send + 'static,
680 F: Future<Output = T> + 'static,
681 T: Send + 'static,
682 {
683 let notifier = sys::new_sleep_notifier()?;
684 let name = format!("{}-{}", self.name, notifier.id());
685 let mut cpu_set_gen = placement::CpuSetGenerator::one(self.placement)?;
686 let io_memory = self.io_memory;
687 let ring_depth = self.ring_depth;
688 let preempt_timer_duration = self.preempt_timer_duration;
689 let spin_before_park = self.spin_before_park;
690 let detect_stalls = self.detect_stalls;
691 let record_io_latencies = self.record_io_latencies;
692 let blocking_thread_pool_placement = self.blocking_thread_pool_placement;
693
694 Builder::new()
695 .name(name)
696 .spawn(move || {
697 let mut le = LocalExecutor::new(
698 notifier,
699 cpu_set_gen.next().cpu_binding(),
700 LocalExecutorConfig {
701 io_memory,
702 ring_depth,
703 preempt_timer: preempt_timer_duration,
704 record_io_latencies,
705 spin_before_park,
706 thread_pool_placement: blocking_thread_pool_placement,
707 detect_stalls,
708 },
709 )?;
710 le.init();
711 le.run(async move { Ok(fut_gen().await) })
712 })
713 .map_err(Into::into)
714 .map(ExecutorJoinHandle)
715 }
716}
717
718impl Default for LocalExecutorBuilder {
719 fn default() -> Self {
720 Self::new(Placement::Unbound)
721 }
722}
723
724/// A factory to configure and create a pool of [`LocalExecutor`]s.
725///
726/// Configuration methods apply their settings to all [`LocalExecutor`]s in the
727/// pool unless otherwise specified. Methods can be chained on the builder in
728/// order to configure it. The [`Self::on_all_shards`] method will take
729/// ownership of the builder and create a [`PoolThreadHandles`] struct which can
730/// be used to join the executor threads.
731///
732/// # Example
733///
734/// ```
735/// use glommio::{LocalExecutorPoolBuilder, PoolPlacement};
736///
737/// let handles = LocalExecutorPoolBuilder::new(PoolPlacement::Unbound(4))
738/// .on_all_shards(|| async move {
739/// let id = glommio::executor().id();
740/// println!("hello from executor {id}");
741/// })
742/// .unwrap();
743///
744/// handles.join_all();
745/// ```
746pub struct LocalExecutorPoolBuilder {
747 /// Spin for duration before parking a reactor
748 spin_before_park: Option<Duration>,
749 /// A name for the thread-to-be (if any), for identification in panic
750 /// messages. Each executor in the pool will use this name followed by
751 /// a hyphen and numeric id (e.g. `myname-1`).
752 name: String,
753 /// Amount of memory to reserve for storage I/O. This will be preallocated
754 /// and registered with io_uring. It is still possible to use more than
755 /// that, but it will come from the standard allocator and performance
756 /// will suffer. Defaults to 10 MiB.
757 io_memory: usize,
758 /// The depth of the IO rings to create. This influences the level of IO
759 /// concurrency. A higher ring depth allows a shard to submit a
760 /// greater number of IO requests to the kernel at once.
761 ring_depth: usize,
762 /// How often to yield to other task queues
763 preempt_timer_duration: Duration,
764 /// Indicates a policy by which [`LocalExecutor`]s are bound to CPUs.
765 placement: PoolPlacement,
766 /// Whether to record the latencies of individual IO requests
767 record_io_latencies: bool,
768 /// The placement policy of the blocking thread pools. Each executor has
769 /// its own pool. Defaults to 1 thread per pool, bound using the same
770 /// placement strategy as its host executor
771 blocking_thread_pool_placement: PoolPlacement,
772 /// Factory function to generate the stall detection handler.
773 /// [`DefaultStallDetectionHandler installs`] a signal handler for
774 /// [`nix::libc::SIGUSR1`], so is disabled by default.
775 handler_gen: Option<Box<dyn Fn() -> Box<dyn stall::StallDetectionHandler + 'static>>>,
776}
777
778impl fmt::Debug for LocalExecutorPoolBuilder {
779 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
780 f.debug_struct("LocalExecutorPoolBuilder")
781 .field("spin_before_park", &self.spin_before_park)
782 .field("name", &self.name)
783 .field("io_memory", &self.io_memory)
784 .field("ring_depth", &self.ring_depth)
785 .field("preempt_timer_duration", &self.preempt_timer_duration)
786 .field("record_io_latencies", &self.record_io_latencies)
787 .field(
788 "blocking_thread_pool_placement",
789 &self.blocking_thread_pool_placement,
790 )
791 .finish_non_exhaustive()
792 }
793}
794
795impl LocalExecutorPoolBuilder {
796 /// Generates the base configuration for spawning a pool of
797 /// [`LocalExecutor`]s, from which configuration methods can be chained.
798 /// The method's only argument is the [`PoolPlacement`] policy by which
799 /// [`LocalExecutor`]s are bound to the machine's hardware topology. i.e.
800 /// how many and which CPUs to use.
801 pub fn new(placement: PoolPlacement) -> Self {
802 Self {
803 spin_before_park: None,
804 name: String::from(DEFAULT_EXECUTOR_NAME),
805 io_memory: DEFAULT_IO_MEMORY,
806 ring_depth: DEFAULT_RING_SUBMISSION_DEPTH,
807 preempt_timer_duration: DEFAULT_PREEMPT_TIMER,
808 placement: placement.clone(),
809 record_io_latencies: false,
810 blocking_thread_pool_placement: placement.shrink_to(1),
811 handler_gen: None,
812 }
813 }
814
815 /// Please see documentation under
816 /// [`LocalExecutorBuilder::spin_before_park`] for details. The setting
817 /// is applied to all executors in the pool.
818 #[must_use = "The builder must be built to be useful"]
819 pub fn spin_before_park(mut self, spin: Duration) -> Self {
820 self.spin_before_park = Some(spin);
821 self
822 }
823
824 /// Please see documentation under [`LocalExecutorBuilder::name`] for
825 /// details. The setting is applied to all executors in the pool. Note
826 /// that when a thread is spawned, the `name` is combined with a hyphen
827 /// and numeric id (e.g. `myname-1`) such that each thread has a unique
828 /// name.
829 #[must_use = "The builder must be built to be useful"]
830 pub fn name(mut self, name: &str) -> Self {
831 self.name = String::from(name);
832 self
833 }
834
835 /// Please see documentation under [`LocalExecutorBuilder::io_memory`] for
836 /// details. The setting is applied to all executors in the pool.
837 #[must_use = "The builder must be built to be useful"]
838 pub fn io_memory(mut self, io_memory: usize) -> Self {
839 self.io_memory = io_memory;
840 self
841 }
842
843 /// Please see documentation under [`LocalExecutorBuilder::ring_depth`] for
844 /// details. The setting is applied to all executors in the pool.
845 #[must_use = "The builder must be built to be useful"]
846 pub fn ring_depth(mut self, ring_depth: usize) -> Self {
847 assert!(ring_depth > 0, "ring depth should be strictly positive");
848 self.ring_depth = ring_depth;
849 self
850 }
851
852 /// Please see documentation under [`LocalExecutorBuilder::preempt_timer`]
853 /// for details. The setting is applied to all executors in the pool.
854 #[must_use = "The builder must be built to be useful"]
855 pub fn preempt_timer(mut self, dur: Duration) -> Self {
856 self.preempt_timer_duration = dur;
857 self
858 }
859
860 /// Whether to record the latencies of individual IO requests as part of the
861 /// IO stats. Recording latency can be expensive. Disabled by default.
862 #[must_use = "The builder must be built to be useful"]
863 pub fn record_io_latencies(mut self, enabled: bool) -> Self {
864 self.record_io_latencies = enabled;
865 self
866 }
867
868 /// The placement policy of the blocking thread pool.
869 /// Defaults to one thread using the same placement strategy as the host
870 /// executor.
871 #[must_use = "The builder must be built to be useful"]
872 pub fn blocking_thread_pool_placement(mut self, placement: PoolPlacement) -> Self {
873 self.blocking_thread_pool_placement = placement;
874 self
875 }
876
877 /// Whether to detect stalls in unyielding tasks.
878 /// This method takes a closure of `handler_gen`, which will be called on
879 /// each new thread to generate the stall detection handler to be used in
880 /// that executor. [`stall::DefaultStallDetectionHandler`] installs a signal
881 /// handler for [`nix::libc::SIGUSR1`], so is disabled by default.
882 /// # Examples
883 ///
884 /// ```
885 /// use glommio::{
886 /// timer::Timer,
887 /// DefaultStallDetectionHandler,
888 /// LocalExecutorPoolBuilder,
889 /// PoolPlacement,
890 /// };
891 ///
892 /// let local_ex = LocalExecutorPoolBuilder::new(PoolPlacement::Unbound(4))
893 /// .detect_stalls(Some(Box::new(|| Box::new(DefaultStallDetectionHandler {}))))
894 /// .on_all_shards(move || async {
895 /// Timer::new(std::time::Duration::from_millis(100)).await;
896 /// println!("Hello world!");
897 /// })
898 /// .expect("failed to spawn local executors")
899 /// .join_all();
900 /// ```
901 #[must_use = "The builder must be built to be useful"]
902 pub fn detect_stalls(
903 mut self,
904 handler_gen: Option<Box<dyn Fn() -> Box<dyn stall::StallDetectionHandler + 'static>>>,
905 ) -> Self {
906 self.handler_gen = handler_gen;
907 self
908 }
909
910 /// Spawn a pool of [`LocalExecutor`]s in a new thread according to the
911 /// [`PoolPlacement`] policy, which is `Unbound` by default.
912 ///
913 /// This method is the pool equivalent of [`LocalExecutorBuilder::spawn`].
914 ///
915 /// The method takes a closure `fut_gen` which will be called on each new
916 /// thread to obtain the [`Future`] to be executed there.
917 ///
918 /// # Panics
919 ///
920 /// The newly spawned thread panics if creating the executor fails. If you
921 /// need more fine-grained error handling consider initializing those
922 /// entities manually.
923 #[must_use = "This spawns executors on multiple threads; threads may fail to spawn or you may \
924 need to call `PoolThreadHandles::join_all()` to keep the main thread alive"]
925 pub fn on_all_shards<G, F, T>(self, fut_gen: G) -> Result<PoolThreadHandles<T>>
926 where
927 G: FnOnce() -> F + Clone + Send + 'static,
928 F: Future<Output = T> + 'static,
929 T: Send + 'static,
930 {
931 let mut handles = PoolThreadHandles::new();
932 let nr_shards = self.placement.executor_count();
933 let mut cpu_set_gen = placement::CpuSetGenerator::pool(self.placement.clone())?;
934 let latch = Latch::new(nr_shards);
935
936 for _ in 0..nr_shards {
937 match self.spawn_thread(&mut cpu_set_gen, &latch, fut_gen.clone()) {
938 Ok(handle) => handles.push(handle),
939 Err(err) => {
940 handles.join_all();
941 return Err(err);
942 }
943 }
944 }
945
946 Ok(handles)
947 }
948
949 /// Spawns a thread
950 fn spawn_thread<G, F, T>(
951 &self,
952 cpu_set_gen: &mut placement::CpuSetGenerator,
953 latch: &Latch,
954 fut_gen: G,
955 ) -> Result<JoinHandle<Result<T>>>
956 where
957 G: FnOnce() -> F + Clone + Send + 'static,
958 F: Future<Output = T> + 'static,
959 T: Send + 'static,
960 {
961 // NOTE: `self.placement` was `std::mem::take`en in `Self::on_all_shards`; you
962 // should no longer rely on its value at this point
963 let cpu_binding = cpu_set_gen.next().cpu_binding();
964 let notifier = sys::new_sleep_notifier()?;
965 let name = format!("{}-{}", self.name, notifier.id());
966 let handle = Builder::new().name(name).spawn({
967 let io_memory = self.io_memory;
968 let ring_depth = self.ring_depth;
969 let preempt_timer_duration = self.preempt_timer_duration;
970 let spin_before_park = self.spin_before_park;
971 let record_io_latencies = self.record_io_latencies;
972 let blocking_thread_pool_placement = self.blocking_thread_pool_placement.clone();
973 let detect_stalls = self.handler_gen.as_ref().map(|x| (*x.deref())());
974 let latch = Latch::clone(latch);
975
976 move || {
977 // only allow the thread to create the `LocalExecutor` if all other threads that
978 // are supposed to be created by the pool builder were successfully spawned
979 if latch.arrive_and_wait() == LatchState::Ready {
980 let mut le = LocalExecutor::new(
981 notifier,
982 cpu_binding,
983 LocalExecutorConfig {
984 io_memory,
985 ring_depth,
986 preempt_timer: preempt_timer_duration,
987 record_io_latencies,
988 spin_before_park,
989 thread_pool_placement: blocking_thread_pool_placement,
990 detect_stalls,
991 },
992 )?;
993 le.init();
994 le.run(async move { Ok(fut_gen().await) })
995 } else {
996 // this `Err` isn't visible to the user; the pool builder directly returns an
997 // `Err` from the `std::thread::Builder`
998 Err(io::Error::new(io::ErrorKind::Other, "spawn failed").into())
999 }
1000 }
1001 });
1002
1003 match handle {
1004 Ok(h) => Ok(h),
1005 Err(e) => {
1006 // The `std::thread::Builder` was unable to spawn the thread and retuned an
1007 // `Err`, so we notify other threads to let them know they
1008 // should not proceed with constructing their `LocalExecutor`s
1009 latch.cancel().expect("unreachable: latch was ready");
1010
1011 Err(e.into())
1012 }
1013 }
1014 }
1015}
1016
1017/// Holds a collection of [`JoinHandle`]s.
1018///
1019/// This struct is returned by [`LocalExecutorPoolBuilder::on_all_shards`].
1020#[derive(Debug)]
1021pub struct PoolThreadHandles<T> {
1022 handles: Vec<JoinHandle<Result<T>>>,
1023}
1024
1025impl<T> PoolThreadHandles<T> {
1026 fn new() -> Self {
1027 Self {
1028 handles: Vec::new(),
1029 }
1030 }
1031
1032 fn push(&mut self, handle: JoinHandle<Result<T>>) {
1033 self.handles.push(handle)
1034 }
1035
1036 /// Obtain a reference to the `JoinHandle`s.
1037 pub fn handles(&self) -> &Vec<JoinHandle<Result<T>>> {
1038 &self.handles
1039 }
1040
1041 /// Calls [`JoinHandle::join`] on all handles.
1042 pub fn join_all(self) -> Vec<Result<T>> {
1043 self.handles
1044 .into_iter()
1045 .map(|h| {
1046 match h.join() {
1047 Ok(ok @ Ok(_)) => ok,
1048 // this variant is unreachable since `Err` is only returned from a thread if
1049 // another thread failed to spawn; `LocalExecutorPoolBuilder::on_all_shards`
1050 // returns an immediate `Err` if any thread fails to spawn, so
1051 // `PoolThreadHandles` would never be created
1052 Ok(err @ Err(_)) => err,
1053 Err(e) => Err(GlommioError::BuilderError(BuilderErrorKind::ThreadPanic(e))),
1054 }
1055 })
1056 .collect::<Vec<_>>()
1057 }
1058}
1059
1060pub(crate) fn maybe_activate(tq: Rc<RefCell<TaskQueue>>) {
1061 #[cfg(not(feature = "native-tls"))]
1062 LOCAL_EX.with(|local_ex| {
1063 let mut queues = local_ex.queues.borrow_mut();
1064 queues.maybe_activate(tq);
1065 });
1066
1067 #[cfg(feature = "native-tls")]
1068 unsafe {
1069 let mut queues = LOCAL_EX
1070 .as_ref()
1071 .expect("this thread doesn't have a LocalExecutor running")
1072 .queues
1073 .borrow_mut();
1074 queues.maybe_activate(tq);
1075 };
1076}
1077
1078pub struct LocalExecutorConfig {
1079 pub io_memory: usize,
1080 pub ring_depth: usize,
1081 pub preempt_timer: Duration,
1082 pub record_io_latencies: bool,
1083 pub spin_before_park: Option<Duration>,
1084 pub thread_pool_placement: PoolPlacement,
1085 pub detect_stalls: Option<Box<dyn stall::StallDetectionHandler + 'static>>,
1086}
1087
1088/// Single-threaded executor.
1089///
1090/// The executor can only be run on the thread that created it.
1091///
1092/// # Examples
1093///
1094/// ```
1095/// use glommio::LocalExecutor;
1096///
1097/// let local_ex = LocalExecutor::default();
1098///
1099/// local_ex.run(async {
1100/// println!("Hello world!");
1101/// });
1102/// ```
1103///
1104/// In many cases, use of [`LocalExecutorBuilder`] will provide more
1105/// configuration options and more ergonomic methods. See
1106/// [`LocalExecutorBuilder::spawn`] for examples.
1107///
1108/// [`LocalExecutorBuilder`]: struct.LocalExecutorBuilder.html
1109///
1110/// [`LocalExecutorBuilder::spawn`]:
1111/// struct.LocalExecutorBuilder.html#method.spawn
1112#[derive(Debug)]
1113pub struct LocalExecutor {
1114 queues: Rc<RefCell<ExecutorQueues>>,
1115 parker: parking::Parker,
1116 id: usize,
1117 reactor: Rc<reactor::Reactor>,
1118 stall_detector: RefCell<Option<StallDetector>>,
1119}
1120
1121impl LocalExecutor {
1122 fn get_reactor(&self) -> Rc<Reactor> {
1123 self.reactor.clone()
1124 }
1125
1126 fn init(&mut self) {
1127 let io_requirements = IoRequirements::new(Latency::NotImportant, 0);
1128 self.queues.borrow_mut().available_executors.insert(
1129 0,
1130 TaskQueue::new(
1131 Default::default(),
1132 "default",
1133 Shares::Static(1000),
1134 io_requirements,
1135 ),
1136 );
1137 }
1138
1139 fn new(
1140 notifier: Arc<sys::SleepNotifier>,
1141 cpu_binding: Option<impl IntoIterator<Item = usize>>,
1142 mut config: LocalExecutorConfig,
1143 ) -> Result<LocalExecutor> {
1144 let blocking_thread =
1145 BlockingThreadPool::new(config.thread_pool_placement, notifier.clone())?;
1146
1147 // Linux's default memory policy is "local allocation" which allocates memory
1148 // on the NUMA node containing the CPU where the allocation takes place.
1149 // Hence, we bind to a CPU in the provided CPU set before allocating any
1150 // memory for the `LocalExecutor`, thereby allowing any access to these
1151 // data structures to occur on a local NUMA node (nevertheless, for some
1152 // `Placement` variants a CPU set could span multiple NUMA nodes).
1153 // For additional information see:
1154 // https://www.kernel.org/doc/html/latest/admin-guide/mm/numa_memory_policy.html
1155 match cpu_binding {
1156 Some(cpu_set) => bind_to_cpu_set(cpu_set)?,
1157 None => config.spin_before_park = None,
1158 }
1159 let p = parking::Parker::new();
1160 let queues = ExecutorQueues::new(config.preempt_timer, config.spin_before_park);
1161 let id = notifier.id();
1162 trace!(id = id, "Creating executor");
1163 Ok(LocalExecutor {
1164 queues: Rc::new(RefCell::new(queues)),
1165 parker: p,
1166 id,
1167 reactor: Rc::new(reactor::Reactor::new(
1168 notifier,
1169 config.io_memory,
1170 config.ring_depth,
1171 config.record_io_latencies,
1172 blocking_thread,
1173 )?),
1174 stall_detector: RefCell::new(
1175 config
1176 .detect_stalls
1177 .map(|x| StallDetector::new(id, x))
1178 .transpose()?,
1179 ),
1180 })
1181 }
1182
1183 /// Enable or disable task stall detection at runtime
1184 ///
1185 /// # Examples
1186 /// ```
1187 /// use glommio::{DefaultStallDetectionHandler, LocalExecutor};
1188 ///
1189 /// let local_ex =
1190 /// LocalExecutor::default().detect_stalls(Some(Box::new(DefaultStallDetectionHandler {})));
1191 /// ```
1192 pub fn detect_stalls(
1193 &self,
1194 handler: Option<Box<dyn stall::StallDetectionHandler + 'static>>,
1195 ) -> Result<()> {
1196 self.stall_detector.replace(
1197 handler
1198 .map(|x| StallDetector::new(self.id, x))
1199 .transpose()?,
1200 );
1201 Ok(())
1202 }
1203
1204 /// Returns a unique identifier for this Executor.
1205 ///
1206 /// # Examples
1207 /// ```
1208 /// use glommio::LocalExecutor;
1209 ///
1210 /// let local_ex = LocalExecutor::default();
1211 /// println!("My ID: {}", local_ex.id());
1212 /// ```
1213 pub fn id(&self) -> usize {
1214 self.id
1215 }
1216
1217 fn create_task_queue<S>(&self, shares: Shares, latency: Latency, name: S) -> TaskQueueHandle
1218 where
1219 S: Into<String>,
1220 {
1221 let index = {
1222 let mut ex = self.queues.borrow_mut();
1223 let index = ex.executor_index;
1224 ex.executor_index += 1;
1225 index
1226 };
1227
1228 let io_requirements = IoRequirements::new(latency, index);
1229 let tq = TaskQueue::new(TaskQueueHandle { index }, name, shares, io_requirements);
1230
1231 self.queues
1232 .borrow_mut()
1233 .available_executors
1234 .insert(index, tq);
1235 TaskQueueHandle { index }
1236 }
1237
1238 /// Removes a task queue.
1239 ///
1240 /// The task queue cannot be removed if there are still pending tasks.
1241 pub fn remove_task_queue(&self, handle: TaskQueueHandle) -> Result<()> {
1242 let mut queues = self.queues.borrow_mut();
1243
1244 let queue_entry = queues.available_executors.entry(handle.index);
1245 if let Entry::Occupied(entry) = queue_entry {
1246 let tq = entry.get();
1247 if tq.borrow().is_active() {
1248 return Err(GlommioError::queue_still_active(handle.index));
1249 }
1250
1251 entry.remove();
1252 return Ok(());
1253 }
1254 Err(GlommioError::queue_not_found(handle.index))
1255 }
1256
1257 fn get_queue(&self, handle: &TaskQueueHandle) -> Option<Rc<RefCell<TaskQueue>>> {
1258 self.queues
1259 .borrow()
1260 .available_executors
1261 .get(&handle.index)
1262 .cloned()
1263 }
1264
1265 fn current_task_queue(&self) -> TaskQueueHandle {
1266 self.queues
1267 .borrow()
1268 .active_executing
1269 .as_ref()
1270 .unwrap()
1271 .borrow()
1272 .stats
1273 .index
1274 }
1275
1276 fn mark_me_for_yield(&self) {
1277 let queues = self.queues.borrow();
1278 let mut me = queues.active_executing.as_ref().unwrap().borrow_mut();
1279 me.yielded = true;
1280 }
1281
1282 fn spawn<T>(&self, future: impl Future<Output = T>) -> multitask::Task<T> {
1283 let tq = self
1284 .queues
1285 .borrow()
1286 .active_executing
1287 .clone() // this clone is cheap because we clone an `Option<Rc<_>>`
1288 .or_else(|| self.get_queue(&TaskQueueHandle { index: 0 }))
1289 .unwrap();
1290
1291 let id = self.id;
1292 let ex = tq.borrow().ex.clone();
1293 ex.spawn_and_run(id, tq, future)
1294 }
1295
1296 fn spawn_into<T, F>(&self, future: F, handle: TaskQueueHandle) -> Result<multitask::Task<T>>
1297 where
1298 F: Future<Output = T>,
1299 {
1300 let tq = self
1301 .get_queue(&handle)
1302 .ok_or_else(|| GlommioError::queue_not_found(handle.index))?;
1303 let ex = tq.borrow().ex.clone();
1304 let id = self.id;
1305
1306 // can't run right away, because we need to cross into a different task queue
1307 Ok(ex.spawn_and_schedule(id, tq, future))
1308 }
1309
1310 fn preempt_timer_duration(&self) -> Duration {
1311 self.queues.borrow().preempt_timer_duration
1312 }
1313
1314 fn spin_before_park(&self) -> Option<Duration> {
1315 self.queues.borrow().spin_before_park
1316 }
1317
1318 #[inline(always)]
1319 pub(crate) fn need_preempt(&self) -> bool {
1320 self.reactor.need_preempt()
1321 }
1322
1323 fn run_task_queues(&self) -> bool {
1324 let mut ran = false;
1325 loop {
1326 self.reactor.sys.install_eventfd();
1327 if self.need_preempt() {
1328 break;
1329 }
1330 if !self.run_one_task_queue() {
1331 return false;
1332 } else {
1333 ran = true;
1334 }
1335 }
1336 ran
1337 }
1338
1339 fn run_one_task_queue(&self) -> bool {
1340 let mut tq = self.queues.borrow_mut();
1341 let candidate = tq.active_executors.pop();
1342 tq.stats.scheduler_runs += 1;
1343
1344 if candidate.is_none() {
1345 return false;
1346 }
1347
1348 let queue = candidate.unwrap();
1349 tq.active_executing = Some(queue.clone());
1350 drop(tq);
1351
1352 let time = {
1353 let now = Instant::now();
1354 let mut queue_ref = queue.borrow_mut();
1355 queue_ref.prepare_to_run(now);
1356 self.reactor
1357 .inform_io_requirements(queue_ref.io_requirements);
1358 now
1359 };
1360
1361 let (runtime, tasks_executed_this_loop) = {
1362 let detector = self.stall_detector.borrow();
1363 let guard = detector.as_ref().map(|x| {
1364 let queue = queue.borrow_mut();
1365 x.enter_task_queue(
1366 queue.stats.index,
1367 queue.name.clone(),
1368 time,
1369 self.preempt_timer_duration(),
1370 )
1371 });
1372
1373 let mut tasks_executed_this_loop = 0;
1374 loop {
1375 let mut queue_ref = queue.borrow_mut();
1376 if self.need_preempt() || queue_ref.yielded() {
1377 break;
1378 }
1379
1380 if let Some(r) = queue_ref.get_task() {
1381 drop(queue_ref);
1382 r.run();
1383 tasks_executed_this_loop += 1;
1384 } else {
1385 break;
1386 }
1387 }
1388 let elapsed = time.elapsed();
1389 drop(guard);
1390 (elapsed, tasks_executed_this_loop)
1391 };
1392
1393 let (need_repush, vruntime) = {
1394 let mut state = queue.borrow_mut();
1395 let last_vruntime = state.account_vruntime(runtime);
1396 (state.is_active(), last_vruntime)
1397 };
1398
1399 let mut tq = self.queues.borrow_mut();
1400 tq.active_executing = None;
1401 tq.stats.executor_runtime += runtime;
1402 tq.stats.tasks_executed += tasks_executed_this_loop;
1403 let vruntime = match vruntime {
1404 Some(x) => x,
1405 None => {
1406 for queue in tq.available_executors.values() {
1407 let mut q = queue.borrow_mut();
1408 q.vruntime = 0;
1409 }
1410 0
1411 }
1412 };
1413
1414 if need_repush {
1415 tq.active_executors.push(queue);
1416 } else {
1417 tq.reevaluate_preempt_timer();
1418 }
1419
1420 // Compute the smallest vruntime out of all the active task queues
1421 // This value is used to set the vruntime of deactivated task queues when they
1422 // are woken up.
1423 tq.default_vruntime = tq
1424 .active_executors
1425 .peek()
1426 .map(|x| x.borrow().vruntime)
1427 .unwrap_or(vruntime);
1428
1429 true
1430 }
1431
1432 /// Runs the executor until the given future completes.
1433 ///
1434 /// # Examples
1435 ///
1436 /// ```
1437 /// use glommio::{LocalExecutor, Task};
1438 ///
1439 /// let local_ex = LocalExecutor::default();
1440 ///
1441 /// let res = local_ex.run(async {
1442 /// let task = glommio::spawn_local(async { 1 + 2 });
1443 /// task.await * 2
1444 /// });
1445 ///
1446 /// assert_eq!(res, 6);
1447 /// ```
1448 pub fn run<T>(&self, future: impl Future<Output = T>) -> T {
1449 let run = |this: &Self| {
1450 // this waker is never exposed in the public interface and is only used to check
1451 // whether the task's `JoinHandle` is `Ready`
1452 let waker = dummy_waker();
1453 let cx = &mut Context::from_waker(&waker);
1454
1455 let spin_before_park = self.spin_before_park().unwrap_or_default();
1456
1457 let future = this
1458 .spawn_into(future, TaskQueueHandle::default())
1459 .unwrap()
1460 .detach();
1461 pin!(future);
1462
1463 let mut pre_time = Instant::now();
1464 loop {
1465 if let Poll::Ready(t) = future.as_mut().poll(cx) {
1466 // can't be canceled, and join handle is None only upon
1467 // cancellation or panic. So in case of panic this just propagates
1468 let cur_time = Instant::now();
1469 this.queues.borrow_mut().stats.total_runtime += cur_time - pre_time;
1470 break t.unwrap();
1471 }
1472
1473 // We want to do I/O before we call run_task_queues,
1474 // for the benefit of the latency ring. If there are pending
1475 // requests that are latency sensitive we want them out of the
1476 // ring ASAP (before we run the task queues). We will also use
1477 // the opportunity to install the timer.
1478 this.parker
1479 .poll_io(|| Some(this.preempt_timer_duration()))
1480 .expect("Failed to poll io! This is actually pretty bad!");
1481
1482 // run user code
1483 let run = this.run_task_queues();
1484
1485 // account for runtime and poll/sleep if possible
1486 let cur_time = Instant::now();
1487 this.queues.borrow_mut().stats.total_runtime += cur_time - pre_time;
1488 pre_time = cur_time;
1489 if !run {
1490 if let Poll::Ready(t) = future.as_mut().poll(cx) {
1491 // It may be that we just became ready now that the task queue
1492 // is exhausted. But if we sleep (park) we'll never know so we
1493 // test again here. We can't test *just* here because the main
1494 // future is probably the one setting up the task queues and etc.
1495 break t.unwrap();
1496 } else {
1497 while !this.reactor.spin_poll_io().unwrap() {
1498 if pre_time.elapsed() > spin_before_park {
1499 this.parker
1500 .park()
1501 .expect("Failed to park! This is actually pretty bad!");
1502 break;
1503 }
1504 }
1505 // reset the timer for deduct spin loop time
1506 pre_time = Instant::now();
1507 }
1508 }
1509 }
1510 };
1511
1512 #[cfg(not(feature = "native-tls"))]
1513 {
1514 assert!(
1515 !LOCAL_EX.is_set(),
1516 "There is already an LocalExecutor running on this thread"
1517 );
1518 LOCAL_EX.set(self, || run(self))
1519 }
1520
1521 #[cfg(feature = "native-tls")]
1522 unsafe {
1523 assert!(
1524 LOCAL_EX.is_null(),
1525 "There is already an LocalExecutor running on this thread"
1526 );
1527
1528 defer!(LOCAL_EX = std::ptr::null());
1529 LOCAL_EX = self as *const Self;
1530 run(self)
1531 }
1532 }
1533}
1534
1535/// Spawns a single-threaded executor with default settings on the current
1536/// thread.
1537///
1538/// This will create a executor using default parameters of
1539/// `LocalExecutorBuilder`, if you want to further customize it, use this API
1540/// instead.
1541///
1542/// # Panics
1543///
1544/// Panics if creating the executor fails; use `LocalExecutorBuilder::make` to
1545/// recover from such errors.
1546///
1547/// # Examples
1548///
1549/// ```
1550/// use glommio::LocalExecutor;
1551///
1552/// let local_ex = LocalExecutor::default();
1553/// ```
1554impl Default for LocalExecutor {
1555 fn default() -> Self {
1556 LocalExecutorBuilder::new(Placement::Unbound)
1557 .make()
1558 .unwrap()
1559 }
1560}
1561
1562/// A spawned future that can be detached
1563///
1564/// Tasks are also futures themselves and yield the output of the spawned
1565/// future.
1566///
1567/// When a task is dropped, its gets canceled and won't be polled again. To
1568/// cancel a task a bit more gracefully and wait until it stops running, use the
1569/// [`cancel()`][`Task::cancel()`] method.
1570///
1571/// Tasks that panic get immediately canceled. Awaiting a canceled task also
1572/// causes a panic.
1573///
1574/// # Examples
1575///
1576/// ```
1577/// # use glommio::{LocalExecutor, Task};
1578/// #
1579/// # let ex = LocalExecutor::default();
1580/// #
1581/// # ex.run(async {
1582/// let task = glommio::spawn_local(async {
1583/// println!("Hello from a task!");
1584/// 1 + 2
1585/// });
1586///
1587/// assert_eq!(task.await, 3);
1588/// # });
1589/// ```
1590/// Note that there is no guarantee of ordering when reasoning about when a
1591/// task runs, as that is an implementation detail.
1592///
1593/// In particular, acquiring a borrow and holding across a task spawning may
1594/// sometimes work but panic depending on scheduling decisions, so it is still
1595/// illegal.
1596///
1597///
1598/// ```no_run
1599/// # use glommio::{LocalExecutor, Task};
1600/// # use std::rc::Rc;
1601/// # use std::cell::RefCell;
1602/// #
1603/// # let ex = LocalExecutor::default();
1604/// #
1605/// # ex.run(async {
1606/// let example = Rc::new(RefCell::new(0));
1607/// let exclone = example.clone();
1608///
1609/// let mut ex_mut = example.borrow_mut();
1610/// *ex_mut = 1;
1611///
1612/// let task = glommio::spawn_local(async move {
1613/// let ex = exclone.borrow();
1614/// println!("Current value: {ex}");
1615/// });
1616///
1617/// // This is fine if `task` executes after the current task, but will panic if
1618/// // preempts the current task and executes first. This is therefore invalid.
1619/// *ex_mut = 2;
1620/// drop(ex_mut);
1621///
1622/// task.await;
1623/// # });
1624/// ```
1625#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
1626#[derive(Debug)]
1627pub struct Task<T>(multitask::Task<T>);
1628
1629impl<T> Task<T> {
1630 /// Detaches the task to let it keep running in the background.
1631 ///
1632 /// # Examples
1633 ///
1634 /// ```
1635 /// use futures_lite::future;
1636 /// use glommio::{timer::Timer, LocalExecutor};
1637 ///
1638 /// let ex = LocalExecutor::default();
1639 /// ex.run(async {
1640 /// glommio::spawn_local(async {
1641 /// loop {
1642 /// println!("I'm a background task looping forever.");
1643 /// glommio::executor().yield_task_queue_now().await;
1644 /// }
1645 /// })
1646 /// .detach();
1647 /// Timer::new(std::time::Duration::from_micros(100)).await;
1648 /// })
1649 /// ```
1650 pub fn detach(self) -> task::JoinHandle<T> {
1651 self.0.detach()
1652 }
1653
1654 /// Cancels the task and waits for it to stop running.
1655 ///
1656 /// Returns the task's output if it was completed just before it got
1657 /// canceled, or [`None`] if it didn't complete.
1658 ///
1659 /// While it's possible to simply drop the [`Task`] to cancel it, this is a
1660 /// cleaner way of canceling because it also waits for the task to stop
1661 /// running.
1662 ///
1663 /// # Examples
1664 ///
1665 /// ```
1666 /// use futures_lite::future;
1667 /// use glommio::LocalExecutor;
1668 ///
1669 /// let ex = LocalExecutor::default();
1670 ///
1671 /// ex.run(async {
1672 /// let task = glommio::spawn_local(async {
1673 /// loop {
1674 /// println!("Even though I'm in an infinite loop, you can still cancel me!");
1675 /// future::yield_now().await;
1676 /// }
1677 /// });
1678 ///
1679 /// task.cancel().await;
1680 /// });
1681 /// ```
1682 pub async fn cancel(self) -> Option<T> {
1683 self.0.cancel().await
1684 }
1685}
1686
1687impl<T> Future for Task<T> {
1688 type Output = T;
1689
1690 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1691 Pin::new(&mut self.0).poll(cx)
1692 }
1693}
1694
1695/// A spawned future that cannot be detached, and has a predictable lifetime.
1696///
1697/// Because their lifetimes are bounded, you don't need to make sure that data
1698/// you pass to the `ScopedTask` is `'static`, which can be cheaper (no need to
1699/// reference count). If you, however, would like to `.detach` this task and
1700/// have it run in the background, consider using [`Task`] instead.
1701///
1702/// Tasks are also futures themselves and yield the output of the spawned
1703/// future.
1704///
1705/// When a task is dropped, its gets canceled and won't be polled again. To
1706/// cancel a task a bit more gracefully and wait until it stops running, use the
1707/// [`cancel()`][`ScopedTask::cancel()`] method.
1708///
1709/// Tasks that panic get immediately canceled. Awaiting a canceled task also
1710/// causes a panic.
1711///
1712/// # Safety
1713///
1714/// `ScopedTask` is safe to use so long as it is guaranteed to be either awaited
1715/// or dropped. Rust does not guarantee that destructors will be called, and if
1716/// they are not, `ScopedTask`s can be kept alive after the scope is terminated.
1717///
1718/// Typically, the only situations in which `drop` is not executed are:
1719///
1720/// * If you manually choose not to, with [`std::mem::forget`] or
1721/// [`ManuallyDrop`].
1722/// * If cyclic reference counts prevents the task from being destroyed.
1723///
1724/// If you believe any of the above situations are present (the first one is,
1725/// of course, considerably easier to spot), avoid using the `ScopedTask`.
1726///
1727/// # Examples
1728///
1729/// ```
1730/// use glommio::LocalExecutor;
1731///
1732/// let ex = LocalExecutor::default();
1733///
1734/// ex.run(async {
1735/// let a = 2;
1736/// let task = unsafe {
1737/// glommio::spawn_scoped_local(async {
1738/// println!("Hello from a task!");
1739/// 1 + a // this is a reference, and it works just fine
1740/// })
1741/// };
1742///
1743/// assert_eq!(task.await, 3);
1744/// });
1745/// ```
1746/// The usual borrow checker rules apply. A [`ScopedTask`] can acquire a mutable
1747/// reference to a variable just fine:
1748///
1749/// ```
1750/// # use glommio::{LocalExecutor};
1751/// #
1752/// # let ex = LocalExecutor::default();
1753/// # ex.run(async {
1754/// let mut a = 2;
1755/// let task = unsafe {
1756/// glommio::spawn_scoped_local(async {
1757/// a = 3;
1758/// })
1759/// };
1760/// task.await;
1761/// assert_eq!(a, 3);
1762/// # });
1763/// ```
1764///
1765/// But until the task completes, the reference is mutably held, so we can no
1766/// longer immutably reference it:
1767///
1768/// ```compile_fail
1769/// # use glommio::LocalExecutor;
1770/// #
1771/// # let ex = LocalExecutor::default();
1772/// # ex.run(async {
1773/// let mut a = 2;
1774/// let task = unsafe {
1775/// glommio::scoped_local(async {
1776/// a = 3;
1777/// })
1778/// };
1779/// assert_eq!(a, 3); // task hasn't completed yet!
1780/// task.await;
1781/// # });
1782/// ```
1783///
1784/// You can still use [`Cell`] and [`RefCell`] normally to work around this.
1785/// Just keep in mind that there is no guarantee of ordering for execution of
1786/// tasks, and if the task has not yet finished the value may or may not have
1787/// changed (as with any interior mutability)
1788///
1789/// ```
1790/// # use glommio::{LocalExecutor};
1791/// # use std::cell::Cell;
1792/// #
1793/// # let ex = LocalExecutor::default();
1794/// # ex.run(async {
1795/// let a = Cell::new(2);
1796/// let task = unsafe {
1797/// glommio::spawn_scoped_local(async {
1798/// a.set(3);
1799/// })
1800/// };
1801///
1802/// assert!(a.get() == 3 || a.get() == 2); // impossible to know if it will be 2 or 3
1803/// task.await;
1804/// assert_eq!(a.get(), 3); // The task finished now.
1805/// //
1806/// # });
1807/// ```
1808///
1809/// The following code, however, will access invalid memory as drop is never
1810/// executed
1811///
1812/// ```no_run
1813/// # use glommio::{LocalExecutor};
1814/// # use std::cell::Cell;
1815/// #
1816/// # let ex = LocalExecutor::default();
1817/// # ex.run(async {
1818/// {
1819/// let a = &mut "mayhem";
1820/// let task = unsafe {
1821/// glommio::spawn_scoped_local(async {
1822/// *a = "doom";
1823/// })
1824/// };
1825/// std::mem::forget(task);
1826/// }
1827/// # });
1828/// ```
1829
1830/// [`Task`]: crate::Task
1831/// [`Cell`]: std::cell::Cell
1832/// [`RefCell`]: std::cell::RefCell
1833/// [`std::mem::forget`]: std::mem::forget
1834/// [`ManuallyDrop`]: std::mem::ManuallyDrop
1835#[must_use = "scoped tasks get canceled when dropped, use a standard Task and `.detach()` to run \
1836 them in the background"]
1837#[derive(Debug)]
1838pub struct ScopedTask<'a, T>(multitask::Task<T>, PhantomData<&'a T>);
1839
1840impl<'a, T> ScopedTask<'a, T> {
1841 /// Cancels the task and waits for it to stop running.
1842 ///
1843 /// Returns the task's output if it was completed just before it got
1844 /// canceled, or [`None`] if it didn't complete.
1845 ///
1846 /// While it's possible to simply drop the [`ScopedTask`] to cancel it, this
1847 /// is a cleaner way of canceling because it also waits for the task to
1848 /// stop running.
1849 ///
1850 /// # Examples
1851 ///
1852 /// ```
1853 /// use futures_lite::future;
1854 /// use glommio::LocalExecutor;
1855 ///
1856 /// let ex = LocalExecutor::default();
1857 ///
1858 /// ex.run(async {
1859 /// let task = unsafe {
1860 /// glommio::spawn_scoped_local(async {
1861 /// loop {
1862 /// println!("Even though I'm in an infinite loop, you can still cancel me!");
1863 /// future::yield_now().await;
1864 /// }
1865 /// })
1866 /// };
1867 ///
1868 /// task.cancel().await;
1869 /// });
1870 /// ```
1871 pub async fn cancel(self) -> Option<T> {
1872 self.0.cancel().await
1873 }
1874}
1875
1876impl<'a, T> Future for ScopedTask<'a, T> {
1877 type Output = T;
1878
1879 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1880 Pin::new(&mut self.0).poll(cx)
1881 }
1882}
1883
1884/// Conditionally yields the current task queue. The scheduler may then
1885/// process other task queues according to their latency requirements.
1886/// If a call to this function results in the current queue to yield,
1887/// then the calling task is moved to the back of the yielded task
1888/// queue.
1889///
1890/// Under which condition this function yield is an implementation detail
1891/// subject to change, but it will always be somehow related to the latency
1892/// guarantees that the task queues want to uphold in their
1893/// `Latency::Matters` parameter (or `Latency::NotImportant`).
1894///
1895/// This function is the central mechanism of task cooperation in Glommio
1896/// and should be preferred over unconditional yielding methods like
1897/// [`ExecutorProxy::yield_now`] and
1898/// [`ExecutorProxy::yield_task_queue_now`].
1899#[inline(always)]
1900pub async fn yield_if_needed() {
1901 executor().yield_if_needed().await
1902}
1903
1904/// Spawns a task onto the current single-threaded executor.
1905///
1906/// If called from a [`LocalExecutor`], the task is spawned on it.
1907/// Otherwise, this method panics.
1908///
1909/// Note that there is no guarantee of when the spawned task is scheduled.
1910/// The current task can continue its execution or be preempted by the
1911/// newly spawned task immediately. See the documentation for the
1912/// top-level [`Task`] for examples.
1913///
1914/// Proxy to [`ExecutorProxy::spawn_local`]
1915///
1916/// # Examples
1917///
1918/// ```
1919/// use glommio::{LocalExecutor, Task};
1920///
1921/// let local_ex = LocalExecutor::default();
1922///
1923/// local_ex.run(async {
1924/// let task = glommio::spawn_local(async { 1 + 2 });
1925/// assert_eq!(task.await, 3);
1926/// });
1927/// ```
1928pub fn spawn_local<T>(future: impl Future<Output = T> + 'static) -> Task<T>
1929where
1930 T: 'static,
1931{
1932 executor().spawn_local(future)
1933}
1934
1935/// Allocates a buffer that is suitable for using to write to Direct Memory
1936/// Access File (DMA). Please note that this implementation uses embedded buddy
1937/// allocator to speed up allocation of the memory chunks, but the same
1938/// allocator is used to server memory needed to write/read data from `uring` so
1939/// probably that is not good idea to keep allocated memory for a long time.
1940/// If you want to keep allocated buffer for a long time please use
1941/// ['crate::allocate_dma_buffer_global'] instead.
1942/// Be careful when you use this buffer with DMA file, size and position of the
1943/// buffer should be properly aligned to the block size of the device where the
1944/// file is located
1945///
1946/// * `size` size of the requested buffer in bytes
1947///
1948/// [`DmaFile`]: crate::io::DmaFile
1949pub fn allocate_dma_buffer(size: usize) -> DmaBuffer {
1950 executor().reactor().alloc_dma_buffer(size)
1951}
1952
1953/// Allocates a buffer that is suitable for using to write to Direct Memory
1954/// Access File (DMA). If you do not plan to keep allocated buffer for a long
1955/// time please use ['crate::allocate_dma_buffer'] instead.
1956/// Be careful when you use this buffer with DMA file, size and position of the
1957/// buffer should be properly aligned to the block size of the device where the
1958/// file is located
1959///
1960/// * `size` size of the requested buffer in bytes
1961///
1962/// [`DmaFile`]: crate::io::DmaFile
1963pub fn allocate_dma_buffer_global(size: usize) -> DmaBuffer {
1964 DmaBuffer::new(size).unwrap()
1965}
1966
1967/// Spawns a task onto the current single-threaded executor, in a particular
1968/// task queue
1969///
1970/// If called from a [`LocalExecutor`], the task is spawned on it.
1971/// Otherwise, this method panics.
1972///
1973/// Note that there is no guarantee of when the spawned task is scheduled.
1974/// The current task can continue its execution or be preempted by the
1975/// newly spawned task immediately. See the documentation for the
1976/// top-level [`Task`] for examples.
1977///
1978/// Proxy to [`ExecutorProxy::spawn_local_into`]
1979///
1980/// # Examples
1981///
1982/// ```
1983/// # use glommio::{ LocalExecutor, Shares, Task};
1984///
1985/// # let local_ex = LocalExecutor::default();
1986/// # local_ex.run(async {
1987/// let handle = glommio::executor().create_task_queue(
1988/// Shares::default(),
1989/// glommio::Latency::NotImportant,
1990/// "test_queue",
1991/// );
1992/// let task = glommio::spawn_local_into(async { 1 + 2 }, handle).expect("failed to spawn task");
1993/// assert_eq!(task.await, 3);
1994/// # });
1995/// ```
1996pub fn spawn_local_into<T>(
1997 future: impl Future<Output = T> + 'static,
1998 handle: TaskQueueHandle,
1999) -> Result<Task<T>>
2000where
2001 T: 'static,
2002{
2003 executor().spawn_local_into(future, handle)
2004}
2005
2006/// Spawns a task onto the current single-threaded executor.
2007///
2008/// If called from a [`LocalExecutor`], the task is spawned on it.
2009///
2010/// Otherwise, this method panics.
2011///
2012/// Proxy to [`ExecutorProxy::spawn_scoped_local`]
2013///
2014/// # Safety
2015///
2016/// `ScopedTask` depends on `drop` running or `.await` being called for
2017/// safety. See the struct [`ScopedTask`] for details.
2018///
2019/// # Examples
2020///
2021/// ```
2022/// use glommio::LocalExecutor;
2023///
2024/// let local_ex = LocalExecutor::default();
2025///
2026/// local_ex.run(async {
2027/// let non_static = 2;
2028/// let task = unsafe { glommio::spawn_scoped_local(async { 1 + non_static }) };
2029/// assert_eq!(task.await, 3);
2030/// });
2031/// ```
2032pub unsafe fn spawn_scoped_local<'a, T>(future: impl Future<Output = T> + 'a) -> ScopedTask<'a, T> {
2033 executor().spawn_scoped_local(future)
2034}
2035
2036/// Spawns a task onto the current single-threaded executor, in a particular
2037/// task queue
2038///
2039/// If called from a [`LocalExecutor`], the task is spawned on it.
2040///
2041/// Otherwise, this method panics.
2042///
2043/// Proxy to [`ExecutorProxy::spawn_scoped_local_into`]
2044///
2045/// # Safety
2046///
2047/// `ScopedTask` depends on `drop` running or `.await` being called for
2048/// safety. See the struct [`ScopedTask`] for details.
2049///
2050/// # Examples
2051///
2052/// ```
2053/// use glommio::{LocalExecutor, Shares};
2054///
2055/// let local_ex = LocalExecutor::default();
2056/// local_ex.run(async {
2057/// let handle = glommio::executor().create_task_queue(
2058/// Shares::default(),
2059/// glommio::Latency::NotImportant,
2060/// "test_queue",
2061/// );
2062/// let non_static = 2;
2063/// let task = unsafe {
2064/// glommio::spawn_scoped_local_into(async { 1 + non_static }, handle)
2065/// .expect("failed to spawn task")
2066/// };
2067/// assert_eq!(task.await, 3);
2068/// })
2069/// ```
2070pub unsafe fn spawn_scoped_local_into<'a, T>(
2071 future: impl Future<Output = T> + 'a,
2072 handle: TaskQueueHandle,
2073) -> Result<ScopedTask<'a, T>> {
2074 executor().spawn_scoped_local_into(future, handle)
2075}
2076
2077/// A proxy struct to the underlying [`LocalExecutor`]. It is accessible from
2078/// anywhere within a Glommio context using [`executor()`].
2079#[derive(Debug)]
2080pub struct ExecutorProxy {}
2081
2082impl ExecutorProxy {
2083 /// Checks if this task has run for too long and need to be preempted. This
2084 /// is useful for situations where we can't call .await, for instance,
2085 /// if a [`RefMut`] is held. If this tests true, then the user is
2086 /// responsible for making any preparations necessary for calling .await
2087 /// and doing it themselves.
2088 ///
2089 /// # Examples
2090 ///
2091 /// ```
2092 /// use glommio::LocalExecutorBuilder;
2093 ///
2094 /// let ex = LocalExecutorBuilder::default()
2095 /// .spawn(|| async {
2096 /// loop {
2097 /// if glommio::executor().need_preempt() {
2098 /// break;
2099 /// }
2100 /// }
2101 /// })
2102 /// .unwrap();
2103 ///
2104 /// ex.join().unwrap();
2105 /// ```
2106 ///
2107 /// [`RefMut`]: https://doc.rust-lang.org/std/cell/struct.RefMut.html
2108 #[inline(always)]
2109 pub fn need_preempt(&self) -> bool {
2110 #[cfg(not(feature = "native-tls"))]
2111 return LOCAL_EX.with(|local_ex| local_ex.need_preempt());
2112
2113 #[cfg(feature = "native-tls")]
2114 return unsafe {
2115 LOCAL_EX
2116 .as_ref()
2117 .map(|ex| ex.need_preempt())
2118 .unwrap_or_default()
2119 };
2120 }
2121
2122 /// Conditionally yields the current task queue. The scheduler may then
2123 /// process other task queues according to their latency requirements.
2124 /// If a call to this function results in the current queue to yield,
2125 /// then the calling task is moved to the back of the yielded task
2126 /// queue.
2127 ///
2128 /// Under which condition this function yield is an implementation detail
2129 /// subject to change, but it will always be somehow related to the latency
2130 /// guarantees that the task queues want to uphold in their
2131 /// `Latency::Matters` parameter (or `Latency::NotImportant`).
2132 ///
2133 /// This function is the central mechanism of task cooperation in Glommio
2134 /// and should be preferred over unconditional yielding methods like
2135 /// [`ExecutorProxy::yield_now`] and
2136 /// [`ExecutorProxy::yield_task_queue_now`].
2137 #[inline(always)]
2138 pub async fn yield_if_needed(&self) {
2139 #[cfg(not(feature = "native-tls"))]
2140 {
2141 let need_yield = if LOCAL_EX.is_set() {
2142 LOCAL_EX.with(|local_ex| {
2143 if local_ex.need_preempt() {
2144 local_ex.mark_me_for_yield();
2145 true
2146 } else {
2147 false
2148 }
2149 })
2150 } else {
2151 // We are not in a glommio context
2152 false
2153 };
2154
2155 if need_yield {
2156 futures_lite::future::yield_now().await;
2157 }
2158 }
2159
2160 #[cfg(feature = "native-tls")]
2161 unsafe {
2162 if self.need_preempt() {
2163 (*LOCAL_EX).mark_me_for_yield();
2164 futures_lite::future::yield_now().await;
2165 }
2166 }
2167 }
2168
2169 /// Unconditionally yields the current task and forces the scheduler
2170 /// to poll another task within the current task queue.
2171 /// Calling this wakes the current task and returns [`Poll::Pending`] once.
2172 ///
2173 /// Unless you know you need to yield right now, using
2174 /// [`ExecutorProxy::yield_if_needed`] instead is the better choice.
2175 #[inline(always)]
2176 pub async fn yield_now(&self) {
2177 futures_lite::future::yield_now().await
2178 }
2179
2180 /// Unconditionally yields the current task queue and forces the scheduler
2181 /// to poll another queue. Use [`ExecutorProxy::yield_now`] to yield within
2182 /// a queue.
2183 ///
2184 /// Unless you know you need to yield right now, using
2185 /// [`ExecutorProxy::yield_if_needed`] instead is the better choice.
2186 #[inline(always)]
2187 pub async fn yield_task_queue_now(&self) {
2188 #[cfg(not(feature = "native-tls"))]
2189 {
2190 if LOCAL_EX.is_set() {
2191 LOCAL_EX.with(|local_ex| {
2192 local_ex.mark_me_for_yield();
2193 })
2194 }
2195 futures_lite::future::yield_now().await;
2196 }
2197
2198 #[cfg(feature = "native-tls")]
2199 {
2200 if let Some(local_ex) = unsafe { LOCAL_EX.as_ref() } {
2201 local_ex.mark_me_for_yield();
2202 }
2203 futures_lite::future::yield_now().await;
2204 }
2205 }
2206
2207 #[inline(always)]
2208 pub(crate) fn reactor(&self) -> Rc<reactor::Reactor> {
2209 #[cfg(not(feature = "native-tls"))]
2210 return LOCAL_EX.with(|local_ex| local_ex.get_reactor());
2211
2212 #[cfg(feature = "native-tls")]
2213 return unsafe {
2214 LOCAL_EX
2215 .as_ref()
2216 .expect("this thread doesn't have a LocalExecutor running")
2217 .get_reactor()
2218 };
2219 }
2220
2221 /// Returns the id of the current executor
2222 ///
2223 /// If called from a [`LocalExecutor`], returns the id of the executor.
2224 ///
2225 /// Otherwise, this method panics.
2226 ///
2227 /// # Examples
2228 ///
2229 /// ```
2230 /// use glommio::{LocalExecutor, Task};
2231 ///
2232 /// let local_ex = LocalExecutor::default();
2233 ///
2234 /// local_ex.run(async {
2235 /// println!("my ID: {}", glommio::executor().id());
2236 /// });
2237 /// ```
2238 pub fn id(&self) -> usize {
2239 #[cfg(not(feature = "native-tls"))]
2240 return LOCAL_EX.with(|local_ex| local_ex.id());
2241
2242 #[cfg(feature = "native-tls")]
2243 return unsafe {
2244 LOCAL_EX
2245 .as_ref()
2246 .expect("this thread doesn't have a LocalExecutor running")
2247 .id()
2248 };
2249 }
2250
2251 /// Creates a new task queue, with a given latency hint and the provided
2252 /// name
2253 ///
2254 /// Each task queue is scheduled based on the [`Shares`] and [`Latency`]
2255 /// system, and tasks within a queue will be scheduled in serial.
2256 ///
2257 /// Returns an opaque handle that can later be used to launch tasks into
2258 /// that queue with [`local_into`].
2259 ///
2260 /// # Examples
2261 ///
2262 /// ```
2263 /// use glommio::{Latency, LocalExecutor, Shares};
2264 /// use std::time::Duration;
2265 ///
2266 /// let local_ex = LocalExecutor::default();
2267 /// local_ex.run(async move {
2268 /// let task_queue = glommio::executor().create_task_queue(
2269 /// Shares::default(),
2270 /// Latency::Matters(Duration::from_secs(1)),
2271 /// "my_tq",
2272 /// );
2273 /// let task = glommio::spawn_local_into(
2274 /// async {
2275 /// println!("Hello world");
2276 /// },
2277 /// task_queue,
2278 /// )
2279 /// .expect("failed to spawn task");
2280 /// });
2281 /// ```
2282 ///
2283 /// [`local_into`]: crate::spawn_local_into
2284 /// [`Shares`]: enum.Shares.html
2285 /// [`Latency`]: enum.Latency.html
2286 pub fn create_task_queue(
2287 &self,
2288 shares: Shares,
2289 latency: Latency,
2290 name: &str,
2291 ) -> TaskQueueHandle {
2292 #[cfg(not(feature = "native-tls"))]
2293 return LOCAL_EX.with(|local_ex| local_ex.create_task_queue(shares, latency, name));
2294
2295 #[cfg(feature = "native-tls")]
2296 return unsafe {
2297 LOCAL_EX
2298 .as_ref()
2299 .expect("this thread doesn't have a LocalExecutor running")
2300 .create_task_queue(shares, latency, name)
2301 };
2302 }
2303
2304 /// Returns the [`TaskQueueHandle`] that represents the TaskQueue currently
2305 /// running. This can be passed directly into [`crate::spawn_local_into`].
2306 /// This must be run from a task that was generated through
2307 /// [`crate::spawn_local`] or [`crate::spawn_local_into`]
2308 ///
2309 /// # Examples
2310 /// ```
2311 /// use glommio::{Latency, LocalExecutor, LocalExecutorBuilder, Shares};
2312 ///
2313 /// let ex = LocalExecutorBuilder::default()
2314 /// .spawn(|| async move {
2315 /// let original_tq = glommio::executor().current_task_queue();
2316 /// let new_tq = glommio::executor().create_task_queue(
2317 /// Shares::default(),
2318 /// Latency::NotImportant,
2319 /// "test",
2320 /// );
2321 ///
2322 /// let task = glommio::spawn_local_into(
2323 /// async move {
2324 /// glommio::spawn_local_into(
2325 /// async move {
2326 /// assert_eq!(glommio::executor().current_task_queue(), original_tq);
2327 /// },
2328 /// original_tq,
2329 /// )
2330 /// .unwrap();
2331 /// },
2332 /// new_tq,
2333 /// )
2334 /// .unwrap();
2335 /// task.await;
2336 /// })
2337 /// .unwrap();
2338 ///
2339 /// ex.join().unwrap();
2340 /// ```
2341 pub fn current_task_queue(&self) -> TaskQueueHandle {
2342 #[cfg(not(feature = "native-tls"))]
2343 return LOCAL_EX.with(|local_ex| local_ex.current_task_queue());
2344
2345 #[cfg(feature = "native-tls")]
2346 return unsafe {
2347 LOCAL_EX
2348 .as_ref()
2349 .expect("this thread doesn't have a LocalExecutor running")
2350 .current_task_queue()
2351 };
2352 }
2353
2354 /// Returns a [`Result`] with its `Ok` value wrapping a [`TaskQueueStats`]
2355 /// or a [`GlommioError`] of type `[QueueErrorKind`] if there is no task
2356 /// queue with this handle
2357 ///
2358 /// # Examples
2359 /// ```
2360 /// use glommio::{Latency, LocalExecutorBuilder, Shares};
2361 ///
2362 /// let ex = LocalExecutorBuilder::default()
2363 /// .spawn(|| async move {
2364 /// let new_tq = glommio::executor().create_task_queue(
2365 /// Shares::default(),
2366 /// Latency::NotImportant,
2367 /// "test",
2368 /// );
2369 /// println!(
2370 /// "Stats for test: {:?}",
2371 /// glommio::executor().task_queue_stats(new_tq).unwrap()
2372 /// );
2373 /// })
2374 /// .unwrap();
2375 ///
2376 /// ex.join().unwrap();
2377 /// ```
2378 ///
2379 /// [`ExecutorStats`]: struct.ExecutorStats.html
2380 /// [`GlommioError`]: crate::error::GlommioError
2381 /// [`QueueErrorKind`]: crate::error::QueueErrorKind
2382 /// [`Result`]: https://doc.rust-lang.org/std/result/enum.Result.html
2383 pub fn task_queue_stats(&self, handle: TaskQueueHandle) -> Result<TaskQueueStats> {
2384 #[cfg(not(feature = "native-tls"))]
2385 return LOCAL_EX.with(|local_ex| match local_ex.get_queue(&handle) {
2386 Some(x) => Ok(x.borrow_mut().stats.take()),
2387 None => Err(GlommioError::queue_not_found(handle.index)),
2388 });
2389
2390 #[cfg(feature = "native-tls")]
2391 return match unsafe {
2392 LOCAL_EX
2393 .as_ref()
2394 .expect("this thread doesn't have a LocalExecutor running")
2395 .get_queue(&handle)
2396 } {
2397 Some(x) => Ok(x.borrow_mut().stats.take()),
2398 None => Err(GlommioError::queue_not_found(handle.index)),
2399 };
2400 }
2401
2402 /// Returns a collection of [`TaskQueueStats`] with information about all
2403 /// task queues in the system
2404 ///
2405 /// The collection can be anything that implements [`Extend`] and it is
2406 /// initially passed by the user, so they can control how allocations are
2407 /// done.
2408 ///
2409 /// # Examples
2410 /// ```
2411 /// use glommio::{executor, Latency, LocalExecutorBuilder, Shares};
2412 ///
2413 /// let ex = LocalExecutorBuilder::default()
2414 /// .spawn(|| async move {
2415 /// let new_tq = glommio::executor().create_task_queue(
2416 /// Shares::default(),
2417 /// Latency::NotImportant,
2418 /// "test",
2419 /// );
2420 /// let v = Vec::new();
2421 /// println!(
2422 /// "Stats for all queues: {:?}",
2423 /// glommio::executor().all_task_queue_stats(v)
2424 /// );
2425 /// })
2426 /// .unwrap();
2427 ///
2428 /// ex.join().unwrap();
2429 /// ```
2430 ///
2431 /// [`ExecutorStats`]: struct.ExecutorStats.html
2432 /// [`Result`]: https://doc.rust-lang.org/std/result/enum.Result.html
2433 /// [`Extend`]: https://doc.rust-lang.org/std/iter/trait.Extend.html
2434 pub fn all_task_queue_stats<V>(&self, mut output: V) -> V
2435 where
2436 V: Extend<TaskQueueStats>,
2437 {
2438 #[cfg(not(feature = "native-tls"))]
2439 LOCAL_EX.with(|local_ex| {
2440 output.extend(
2441 local_ex
2442 .queues
2443 .borrow()
2444 .available_executors
2445 .values()
2446 .map(|x| x.borrow_mut().stats.take()),
2447 );
2448 });
2449
2450 #[cfg(feature = "native-tls")]
2451 output.extend(unsafe {
2452 LOCAL_EX
2453 .as_ref()
2454 .expect("this thread doesn't have a LocalExecutor running")
2455 .queues
2456 .borrow()
2457 .available_executors
2458 .values()
2459 .map(|x| x.borrow_mut().stats.take())
2460 });
2461
2462 output
2463 }
2464
2465 /// Returns a [`ExecutorStats`] struct with information about this Executor
2466 ///
2467 /// # Examples:
2468 ///
2469 /// ```
2470 /// use glommio::{executor, LocalExecutorBuilder};
2471 ///
2472 /// let ex = LocalExecutorBuilder::default()
2473 /// .spawn(|| async move {
2474 /// println!(
2475 /// "Stats for executor: {:?}",
2476 /// glommio::executor().executor_stats()
2477 /// );
2478 /// })
2479 /// .unwrap();
2480 ///
2481 /// ex.join().unwrap();
2482 /// ```
2483 ///
2484 /// [`ExecutorStats`]: struct.ExecutorStats.html
2485 pub fn executor_stats(&self) -> ExecutorStats {
2486 #[cfg(not(feature = "native-tls"))]
2487 return LOCAL_EX.with(|local_ex| std::mem::take(&mut local_ex.queues.borrow_mut().stats));
2488
2489 #[cfg(feature = "native-tls")]
2490 return std::mem::take(unsafe {
2491 &mut LOCAL_EX
2492 .as_ref()
2493 .expect("this thread doesn't have a LocalExecutor running")
2494 .queues
2495 .borrow_mut()
2496 .stats
2497 });
2498 }
2499
2500 /// Returns an [`IoStats`] struct with information about IO performed by
2501 /// this executor's reactor
2502 ///
2503 /// # Examples:
2504 ///
2505 /// ```
2506 /// use glommio::LocalExecutorBuilder;
2507 ///
2508 /// let ex = LocalExecutorBuilder::default()
2509 /// .spawn(|| async move {
2510 /// println!("Stats for executor: {:?}", glommio::executor().io_stats());
2511 /// })
2512 /// .unwrap();
2513 ///
2514 /// ex.join().unwrap();
2515 /// ```
2516 ///
2517 /// [`IoStats`]: crate::IoStats
2518 pub fn io_stats(&self) -> IoStats {
2519 #[cfg(not(feature = "native-tls"))]
2520 return LOCAL_EX.with(|local_ex| local_ex.get_reactor().io_stats());
2521
2522 #[cfg(feature = "native-tls")]
2523 return unsafe {
2524 LOCAL_EX
2525 .as_ref()
2526 .expect("this thread doesn't have a LocalExecutor running")
2527 .get_reactor()
2528 .io_stats()
2529 };
2530 }
2531
2532 /// Returns an [`IoStats`] struct with information about IO performed from
2533 /// the provided TaskQueue by this executor's reactor
2534 ///
2535 /// # Examples:
2536 ///
2537 /// ```
2538 /// use glommio::{Latency, LocalExecutorBuilder, Shares};
2539 ///
2540 /// let ex = LocalExecutorBuilder::default()
2541 /// .spawn(|| async move {
2542 /// let new_tq = glommio::executor().create_task_queue(
2543 /// Shares::default(),
2544 /// Latency::NotImportant,
2545 /// "test",
2546 /// );
2547 /// println!(
2548 /// "Stats for executor: {:?}",
2549 /// glommio::executor().task_queue_io_stats(new_tq)
2550 /// );
2551 /// })
2552 /// .unwrap();
2553 ///
2554 /// ex.join().unwrap();
2555 /// ```
2556 ///
2557 /// [`IoStats`]: crate::IoStats
2558 pub fn task_queue_io_stats(&self, handle: TaskQueueHandle) -> Result<IoStats> {
2559 #[cfg(not(feature = "native-tls"))]
2560 return LOCAL_EX.with(|local_ex| {
2561 match local_ex.get_reactor().task_queue_io_stats(&handle) {
2562 Some(x) => Ok(x),
2563 None => Err(GlommioError::queue_not_found(handle.index)),
2564 }
2565 });
2566
2567 #[cfg(feature = "native-tls")]
2568 return match unsafe {
2569 LOCAL_EX
2570 .as_ref()
2571 .expect("this thread doesn't have a LocalExecutor running")
2572 .get_reactor()
2573 .task_queue_io_stats(&handle)
2574 } {
2575 Some(x) => Ok(x),
2576 None => Err(GlommioError::queue_not_found(handle.index)),
2577 };
2578 }
2579
2580 /// Spawns a task onto the current single-threaded executor.
2581 ///
2582 /// If called from a [`LocalExecutor`], the task is spawned on it.
2583 /// Otherwise, this method panics.
2584 ///
2585 /// Note that there is no guarantee of when the spawned task is scheduled.
2586 /// The current task can continue its execution or be preempted by the
2587 /// newly spawned task immediately. See the documentation for the
2588 /// top-level [`Task`] for examples.
2589 ///
2590 /// # Examples
2591 ///
2592 /// ```
2593 /// use glommio::{LocalExecutor, Task};
2594 ///
2595 /// let local_ex = LocalExecutor::default();
2596 ///
2597 /// local_ex.run(async {
2598 /// let task = glommio::executor().spawn_local(async { 1 + 2 });
2599 /// assert_eq!(task.await, 3);
2600 /// });
2601 /// ```
2602 pub fn spawn_local<T>(&self, future: impl Future<Output = T> + 'static) -> Task<T>
2603 where
2604 T: 'static,
2605 {
2606 #[cfg(not(feature = "native-tls"))]
2607 return LOCAL_EX.with(|local_ex| Task::<T>(local_ex.spawn(future)));
2608
2609 #[cfg(feature = "native-tls")]
2610 return Task::<T>(unsafe {
2611 LOCAL_EX
2612 .as_ref()
2613 .expect("this thread doesn't have a LocalExecutor running")
2614 .spawn(future)
2615 });
2616 }
2617
2618 /// Spawns a task onto the current single-threaded executor, in a particular
2619 /// task queue
2620 ///
2621 /// If called from a [`LocalExecutor`], the task is spawned on it.
2622 /// Otherwise, this method panics.
2623 ///
2624 /// Note that there is no guarantee of when the spawned task is scheduled.
2625 /// The current task can continue its execution or be preempted by the
2626 /// newly spawned task immediately. See the documentation for the
2627 /// top-level [`Task`] for examples.
2628 ///
2629 /// # Examples
2630 ///
2631 /// ```
2632 /// # use glommio::{LocalExecutor, Shares, Task};
2633 ///
2634 /// # let local_ex = LocalExecutor::default();
2635 /// # local_ex.run(async {
2636 /// let handle = glommio::executor().create_task_queue(
2637 /// Shares::default(),
2638 /// glommio::Latency::NotImportant,
2639 /// "test_queue",
2640 /// );
2641 /// let task = glommio::executor()
2642 /// .spawn_local_into(async { 1 + 2 }, handle)
2643 /// .expect("failed to spawn task");
2644 /// assert_eq!(task.await, 3);
2645 /// # });
2646 /// ```
2647 pub fn spawn_local_into<T>(
2648 &self,
2649 future: impl Future<Output = T> + 'static,
2650 handle: TaskQueueHandle,
2651 ) -> Result<Task<T>>
2652 where
2653 T: 'static,
2654 {
2655 #[cfg(not(feature = "native-tls"))]
2656 return LOCAL_EX.with(|local_ex| local_ex.spawn_into(future, handle).map(Task::<T>));
2657
2658 #[cfg(feature = "native-tls")]
2659 return unsafe {
2660 LOCAL_EX
2661 .as_ref()
2662 .expect("this thread doesn't have a LocalExecutor running")
2663 .spawn_into(future, handle)
2664 }
2665 .map(Task::<T>);
2666 }
2667
2668 /// Spawns a task onto the current single-threaded executor.
2669 ///
2670 /// If called from a [`LocalExecutor`], the task is spawned on it.
2671 ///
2672 /// Otherwise, this method panics.
2673 ///
2674 /// # Safety
2675 ///
2676 /// `ScopedTask` depends on `drop` running or `.await` being called for
2677 /// safety. See the struct [`ScopedTask`] for details.
2678 ///
2679 /// # Examples
2680 ///
2681 /// ```
2682 /// use glommio::LocalExecutor;
2683 ///
2684 /// let local_ex = LocalExecutor::default();
2685 ///
2686 /// local_ex.run(async {
2687 /// let non_static = 2;
2688 /// let task = unsafe { glommio::executor().spawn_scoped_local(async { 1 + non_static }) };
2689 /// assert_eq!(task.await, 3);
2690 /// });
2691 /// ```
2692 pub unsafe fn spawn_scoped_local<'a, T>(
2693 &self,
2694 future: impl Future<Output = T> + 'a,
2695 ) -> ScopedTask<'a, T> {
2696 #[cfg(not(feature = "native-tls"))]
2697 return LOCAL_EX.with(|local_ex| ScopedTask::<'a, T>(local_ex.spawn(future), PhantomData));
2698
2699 #[cfg(feature = "native-tls")]
2700 return ScopedTask::<'a, T>(
2701 LOCAL_EX
2702 .as_ref()
2703 .expect("this thread doesn't have a LocalExecutor running")
2704 .spawn(future),
2705 PhantomData,
2706 );
2707 }
2708
2709 /// Spawns a task onto the current single-threaded executor, in a particular
2710 /// task queue
2711 ///
2712 /// If called from a [`LocalExecutor`], the task is spawned on it.
2713 ///
2714 /// Otherwise, this method panics.
2715 ///
2716 /// # Safety
2717 ///
2718 /// `ScopedTask` depends on `drop` running or `.await` being called for
2719 /// safety. See the struct [`ScopedTask`] for details.
2720 ///
2721 /// # Examples
2722 ///
2723 /// ```
2724 /// use glommio::{LocalExecutor, Shares};
2725 ///
2726 /// let local_ex = LocalExecutor::default();
2727 /// local_ex.run(async {
2728 /// let handle = glommio::executor().create_task_queue(
2729 /// Shares::default(),
2730 /// glommio::Latency::NotImportant,
2731 /// "test_queue",
2732 /// );
2733 /// let non_static = 2;
2734 /// let task = unsafe {
2735 /// glommio::executor()
2736 /// .spawn_scoped_local_into(async { 1 + non_static }, handle)
2737 /// .expect("failed to spawn task")
2738 /// };
2739 /// assert_eq!(task.await, 3);
2740 /// })
2741 /// ```
2742 pub unsafe fn spawn_scoped_local_into<'a, T>(
2743 &self,
2744 future: impl Future<Output = T> + 'a,
2745 handle: TaskQueueHandle,
2746 ) -> Result<ScopedTask<'a, T>> {
2747 #[cfg(not(feature = "native-tls"))]
2748 return LOCAL_EX.with(|local_ex| {
2749 local_ex
2750 .spawn_into(future, handle)
2751 .map(|x| ScopedTask::<'a, T>(x, PhantomData))
2752 });
2753
2754 #[cfg(feature = "native-tls")]
2755 return LOCAL_EX
2756 .as_ref()
2757 .expect("this thread doesn't have a LocalExecutor running")
2758 .spawn_into(future, handle)
2759 .map(|x| ScopedTask::<'a, T>(x, PhantomData));
2760 }
2761
2762 /// Spawns a blocking task into a background thread where blocking is
2763 /// acceptable.
2764 ///
2765 /// Glommio depends on cooperation from tasks in order to drive IO and meet
2766 /// latency requirements. Unyielding tasks are detrimental to the
2767 /// performance of the overall system, not just to the performance of
2768 /// the one stalling task.
2769 ///
2770 /// `spawn_blocking` is there as a last resort when a blocking task needs to
2771 /// be executed and cannot be made cooperative. Examples are:
2772 /// * Expensive syscalls that cannot use `io_uring`, such as `mmap`
2773 /// (especially with `MAP_POPULATE`)
2774 /// * Calls to synchronous third-party code (compression, encoding, etc.)
2775 ///
2776 /// # Note
2777 ///
2778 /// *This method is not meant to be a way to achieve compute parallelism.*
2779 /// Distributing work across executors is the better way to achieve that.
2780 ///
2781 /// # Examples
2782 ///
2783 /// ```
2784 /// use glommio::{LocalExecutor, Task};
2785 /// use std::time::Duration;
2786 ///
2787 /// let local_ex = LocalExecutor::default();
2788 ///
2789 /// local_ex.run(async {
2790 /// let task = glommio::executor()
2791 /// .spawn_blocking(|| {
2792 /// std::thread::sleep(Duration::from_millis(100));
2793 /// })
2794 /// .await;
2795 /// });
2796 /// ```
2797 pub fn spawn_blocking<F, R>(&self, func: F) -> impl Future<Output = R>
2798 where
2799 F: FnOnce() -> R + Send + 'static,
2800 R: Send + 'static,
2801 {
2802 let result = Arc::new(Mutex::new(MaybeUninit::<R>::uninit()));
2803 let f_inner = enclose::enclose!((result) move || {result.lock().unwrap().write(func());});
2804
2805 #[cfg(not(feature = "native-tls"))]
2806 let waiter =
2807 LOCAL_EX.with(move |local_ex| local_ex.reactor.run_blocking(Box::new(f_inner)));
2808
2809 #[cfg(feature = "native-tls")]
2810 let waiter = unsafe {
2811 LOCAL_EX
2812 .as_ref()
2813 .expect("this thread doesn't have a LocalExecutor running")
2814 .reactor
2815 .run_blocking(Box::new(f_inner))
2816 };
2817
2818 async move {
2819 let source = waiter.await;
2820 assert!(source.collect_rw().await.is_ok());
2821 unsafe {
2822 let res_arc = Arc::try_unwrap(result).expect("leak");
2823 let ret = std::mem::replace(
2824 &mut *res_arc.lock().unwrap().deref_mut(),
2825 MaybeUninit::<R>::uninit(),
2826 )
2827 .assume_init();
2828 ret
2829 }
2830 }
2831 }
2832}
2833
2834#[cfg(test)]
2835mod test {
2836 use core::mem::MaybeUninit;
2837 use std::{
2838 cell::Cell,
2839 collections::HashMap,
2840 sync::{
2841 atomic::{AtomicUsize, Ordering},
2842 Arc, Mutex,
2843 },
2844 task::Waker,
2845 };
2846
2847 use futures::{
2848 future::{join, join_all, poll_fn},
2849 join,
2850 };
2851
2852 use crate::{
2853 enclose,
2854 timer::{self, sleep, Timer},
2855 SharesManager,
2856 };
2857
2858 use super::*;
2859
2860 #[test]
2861 fn create_and_destroy_executor() {
2862 let mut var = Rc::new(RefCell::new(0));
2863 let local_ex = LocalExecutor::default();
2864 let varclone = var.clone();
2865 local_ex.run(async move {
2866 let mut m = varclone.borrow_mut();
2867 *m += 10;
2868 });
2869
2870 let v = Rc::get_mut(&mut var).unwrap();
2871 let v = v.replace(0);
2872 assert_eq!(v, 10);
2873 }
2874
2875 #[test]
2876 fn create_fail_to_bind() {
2877 // If you have a system with 4 billion CPUs let me know and I will
2878 // update this test.
2879 if LocalExecutorBuilder::new(Placement::Fixed(usize::MAX))
2880 .make()
2881 .is_ok()
2882 {
2883 unreachable!("Should have failed");
2884 }
2885 }
2886
2887 #[test]
2888 fn bind_to_cpu_set_range() {
2889 // libc supports cpu ids up to 1023 and will use the intersection of values
2890 // specified by the cpu mask and those present on the system
2891 // https://man7.org/linux/man-pages/man2/sched_setaffinity.2.html#NOTES
2892 assert!(bind_to_cpu_set(vec![0, 1, 2, 3]).is_ok());
2893 assert!(bind_to_cpu_set(0..1024).is_ok());
2894 assert!(bind_to_cpu_set(0..1025).is_err());
2895 }
2896
2897 #[test]
2898 fn create_and_bind() {
2899 if let Err(x) = LocalExecutorBuilder::new(Placement::Fixed(0)).make() {
2900 panic!("got error {:?}", x);
2901 }
2902 }
2903
2904 #[test]
2905 #[should_panic]
2906 fn spawn_without_executor() {
2907 let _ = LocalExecutor::default();
2908 std::mem::drop(crate::spawn_local(async move {}));
2909 }
2910
2911 #[test]
2912 fn invalid_task_queue() {
2913 let local_ex = LocalExecutor::default();
2914 local_ex.run(async {
2915 let task = crate::spawn_local_into(
2916 async move {
2917 unreachable!("Should not have executed this");
2918 },
2919 TaskQueueHandle { index: 1 },
2920 );
2921
2922 if task.is_ok() {
2923 unreachable!("Should have failed");
2924 }
2925 });
2926 }
2927
2928 #[test]
2929 fn ten_yielding_queues() {
2930 let local_ex = LocalExecutor::default();
2931
2932 // 0 -> no one
2933 // 1 -> t1
2934 // 2 -> t2...
2935 let executed_last = Rc::new(RefCell::new(0));
2936 local_ex.run(async {
2937 let mut joins = Vec::with_capacity(10);
2938 for id in 1..11 {
2939 let exec = executed_last.clone();
2940 joins.push(crate::spawn_local(async move {
2941 for _ in 0..10_000 {
2942 {
2943 let mut last = exec.borrow_mut();
2944 assert_ne!(id, *last);
2945 *last = id;
2946 }
2947 crate::executor().yield_task_queue_now().await;
2948 }
2949 }));
2950 }
2951 futures::future::join_all(joins).await;
2952 });
2953 }
2954
2955 #[test]
2956 fn task_with_latency_requirements() {
2957 let local_ex = LocalExecutor::default();
2958
2959 local_ex.run(async {
2960 let not_latency = crate::executor().create_task_queue(
2961 Shares::default(),
2962 Latency::NotImportant,
2963 "test",
2964 );
2965 let latency = crate::executor().create_task_queue(
2966 Shares::default(),
2967 Latency::Matters(Duration::from_millis(2)),
2968 "testlat",
2969 );
2970
2971 let nolat_started = Rc::new(RefCell::new(false));
2972 let lat_status = Rc::new(RefCell::new(false));
2973
2974 // Loop until need_preempt is set. It is set to 2ms, but because this is a test
2975 // and can be running overcommited or in whichever shared infrastructure, we'll
2976 // allow the timer to fire in up to 1s. If it didn't fire in 1s, that's broken.
2977 let nolat = local_ex
2978 .spawn_into(
2979 crate::enclose! { (nolat_started, lat_status)
2980 async move {
2981 *(nolat_started.borrow_mut()) = true;
2982
2983 let start = Instant::now();
2984 // Now busy loop and make sure that we yield when we have too.
2985 loop {
2986 if *(lat_status.borrow()) {
2987 break; // Success!
2988 }
2989 if start.elapsed().as_secs() > 1 {
2990 panic!("Never received preempt signal");
2991 }
2992 crate::yield_if_needed().await;
2993 }
2994 }
2995 },
2996 not_latency,
2997 )
2998 .unwrap();
2999
3000 let lat = local_ex
3001 .spawn_into(
3002 crate::enclose! { (nolat_started, lat_status)
3003 async move {
3004 // In case we are executed first, yield to the other task
3005 loop {
3006 if !(*(nolat_started.borrow())) {
3007 crate::executor().yield_task_queue_now().await;
3008 } else {
3009 break;
3010 }
3011 }
3012 *(lat_status.borrow_mut()) = true;
3013 }
3014 },
3015 latency,
3016 )
3017 .unwrap();
3018
3019 futures::join!(nolat, lat);
3020 });
3021 }
3022
3023 #[test]
3024 fn current_task_queue_matches() {
3025 let local_ex = LocalExecutor::default();
3026 local_ex.run(async {
3027 let tq1 = crate::executor().create_task_queue(
3028 Shares::default(),
3029 Latency::NotImportant,
3030 "test1",
3031 );
3032 let tq2 = crate::executor().create_task_queue(
3033 Shares::default(),
3034 Latency::NotImportant,
3035 "test2",
3036 );
3037
3038 let id1 = tq1.index;
3039 let id2 = tq2.index;
3040 let j0 = crate::spawn_local(async {
3041 assert_eq!(crate::executor().current_task_queue().index, 0);
3042 });
3043 let j1 = crate::spawn_local_into(
3044 async move {
3045 assert_eq!(crate::executor().current_task_queue().index, id1);
3046 },
3047 tq1,
3048 )
3049 .unwrap();
3050 let j2 = crate::spawn_local_into(
3051 async move {
3052 assert_eq!(crate::executor().current_task_queue().index, id2);
3053 },
3054 tq2,
3055 )
3056 .unwrap();
3057 futures::join!(j0, j1, j2);
3058 })
3059 }
3060
3061 #[test]
3062 fn task_optimized_for_throughput() {
3063 let local_ex = LocalExecutor::default();
3064
3065 local_ex.run(async {
3066 let tq1 = crate::executor().create_task_queue(
3067 Shares::default(),
3068 Latency::NotImportant,
3069 "test",
3070 );
3071 let tq2 = crate::executor().create_task_queue(
3072 Shares::default(),
3073 Latency::NotImportant,
3074 "testlat",
3075 );
3076
3077 let first_started = Rc::new(RefCell::new(0));
3078 let second_status = Rc::new(RefCell::new(0));
3079
3080 let first = local_ex
3081 .spawn_into(
3082 crate::enclose! { (first_started, second_status)
3083 async move {
3084 let start = Instant::now();
3085 // Now busy loop and make sure that we yield when we have too.
3086 loop {
3087 {
3088 let mut count = first_started.borrow_mut();
3089 *count += 1;
3090
3091 if start.elapsed().as_millis() >= 99 {
3092 break;
3093 }
3094
3095 if *count < *(second_status.borrow()) {
3096 panic!("I was preempted but should not have been");
3097 }
3098 }
3099 crate::yield_if_needed().await;
3100 }
3101 }
3102 },
3103 tq1,
3104 )
3105 .unwrap();
3106
3107 let second = local_ex
3108 .spawn_into(
3109 crate::enclose! { (first_started, second_status)
3110 async move {
3111 // In case we are executed first, yield to the other task
3112 loop {
3113 {
3114 let mut count = second_status.borrow_mut();
3115 *count += 1;
3116 if *count < *(first_started.borrow()) {
3117 break;
3118 }
3119 }
3120 crate::executor().yield_task_queue_now().await;
3121 }
3122 }
3123 },
3124 tq2,
3125 )
3126 .unwrap();
3127
3128 futures::join!(first, second);
3129 });
3130 }
3131
3132 #[test]
3133 fn test_detach() {
3134 let ex = LocalExecutor::default();
3135
3136 ex.run(async {
3137 crate::spawn_local(async {
3138 loop {
3139 crate::executor().yield_task_queue_now().await;
3140 }
3141 })
3142 .detach();
3143
3144 Timer::new(Duration::from_micros(100)).await;
3145 });
3146 }
3147
3148 /// As far as impl From<libc::timeval> for Duration is not allowed.
3149 fn from_timeval(v: libc::timeval) -> Duration {
3150 Duration::from_secs(v.tv_sec as u64) + Duration::from_micros(v.tv_usec as u64)
3151 }
3152
3153 fn getrusage() -> Duration {
3154 let mut s0 = MaybeUninit::<libc::rusage>::uninit();
3155 let err = unsafe { libc::getrusage(libc::RUSAGE_THREAD, s0.as_mut_ptr()) };
3156 if err != 0 {
3157 panic!("getrusage error = {}", err);
3158 }
3159 let usage = unsafe { s0.assume_init() };
3160 from_timeval(usage.ru_utime) + from_timeval(usage.ru_stime)
3161 }
3162
3163 #[test]
3164 fn test_no_spin() {
3165 let ex = LocalExecutor::default();
3166 let task_queue = ex.create_task_queue(
3167 Shares::default(),
3168 Latency::Matters(Duration::from_millis(10)),
3169 "my_tq",
3170 );
3171 let start = getrusage();
3172 ex.run(async {
3173 crate::spawn_local_into(
3174 async { timer::sleep(Duration::from_secs(1)).await },
3175 task_queue,
3176 )
3177 .expect("failed to spawn task")
3178 .await;
3179 });
3180
3181 assert!(
3182 getrusage() - start < Duration::from_millis(10),
3183 "expected user time on LE is less than 10 millisecond"
3184 );
3185 }
3186
3187 #[test]
3188 fn test_spin() {
3189 let dur = Duration::from_secs(1);
3190 let ex0 = LocalExecutorBuilder::default().make().unwrap();
3191 ex0.run(async {
3192 let ex0_ru_start = getrusage();
3193 timer::sleep(dur).await;
3194 let ex0_ru_finish = getrusage();
3195
3196 assert!(
3197 ex0_ru_finish - ex0_ru_start < Duration::from_millis(10),
3198 "expected user time on LE0 is less than 10 millisecond"
3199 );
3200 });
3201
3202 let ex = LocalExecutorBuilder::new(Placement::Fixed(0))
3203 .spin_before_park(Duration::from_millis(100))
3204 .make()
3205 .unwrap();
3206
3207 ex.run(async {
3208 let ex_ru_start = getrusage();
3209 timer::sleep(dur).await;
3210 let ex_ru_finish = getrusage();
3211
3212 // 100 ms may have passed without us running for 100ms in case
3213 // there are other threads. Need to be a bit more relaxed
3214 assert!(
3215 ex_ru_finish - ex_ru_start >= Duration::from_millis(50),
3216 "expected user time on LE is much greater than 50 millisecond",
3217 );
3218 });
3219 }
3220
3221 #[test]
3222 fn test_runtime_stats() {
3223 let dur = Duration::from_secs(2);
3224 let ex0 = LocalExecutorBuilder::default().make().unwrap();
3225 ex0.run(async {
3226 assert!(
3227 crate::executor().executor_stats().total_runtime() < Duration::from_nanos(10),
3228 "expected runtime on LE {:#?} is less than 10 ns",
3229 crate::executor().executor_stats().total_runtime()
3230 );
3231
3232 let now = Instant::now();
3233 while now.elapsed().as_millis() < 200 {}
3234 crate::executor().yield_task_queue_now().await;
3235 assert!(
3236 crate::executor().executor_stats().total_runtime() >= Duration::from_millis(200),
3237 "expected runtime on LE0 {:#?} is greater than 200 ms",
3238 crate::executor().executor_stats().total_runtime()
3239 );
3240
3241 timer::sleep(dur).await;
3242 assert!(
3243 crate::executor().executor_stats().total_runtime() < Duration::from_millis(400),
3244 "expected runtime on LE0 {:#?} is not greater than 400 ms",
3245 crate::executor().executor_stats().total_runtime()
3246 );
3247 });
3248
3249 let ex = LocalExecutorBuilder::new(Placement::Fixed(0))
3250 // ensure entire sleep should spin
3251 .spin_before_park(Duration::from_secs(5))
3252 .make()
3253 .unwrap();
3254 ex.run(async {
3255 crate::spawn_local(async move {
3256 assert!(
3257 crate::executor().executor_stats().total_runtime() < Duration::from_nanos(10),
3258 "expected runtime on LE {:#?} is less than 10 ns",
3259 crate::executor().executor_stats().total_runtime()
3260 );
3261
3262 let now = Instant::now();
3263 while now.elapsed().as_millis() < 200 {}
3264 crate::executor().yield_task_queue_now().await;
3265 assert!(
3266 crate::executor().executor_stats().total_runtime()
3267 >= Duration::from_millis(200),
3268 "expected runtime on LE {:#?} is greater than 200 ms",
3269 crate::executor().executor_stats().total_runtime()
3270 );
3271 timer::sleep(dur).await;
3272 assert!(
3273 crate::executor().executor_stats().total_runtime() < Duration::from_millis(400),
3274 "expected runtime on LE {:#?} is not greater than 400 ms",
3275 crate::executor().executor_stats().total_runtime()
3276 );
3277 })
3278 .await;
3279 });
3280 }
3281
3282 // Spin for 2ms and then yield. How many shares we have should control how many
3283 // quantas we manage to execute.
3284 async fn work_quanta() {
3285 let now = Instant::now();
3286 while now.elapsed().as_millis() < 2 {}
3287 crate::executor().yield_task_queue_now().await;
3288 }
3289
3290 macro_rules! test_static_shares {
3291 ( $s1:expr, $s2:expr, $work:block ) => {
3292 let local_ex = LocalExecutor::default();
3293
3294 local_ex.run(async {
3295 // Run a latency queue, otherwise a queue will run for too long uninterrupted
3296 // and we'd have to run this test for a very long time for things to equalize.
3297 let tq1 = crate::executor().create_task_queue(
3298 Shares::Static($s1),
3299 Latency::Matters(Duration::from_millis(1)),
3300 "test_1",
3301 );
3302 let tq2 = crate::executor().create_task_queue(
3303 Shares::Static($s2),
3304 Latency::Matters(Duration::from_millis(1)),
3305 "test_2",
3306 );
3307
3308 let tq1_count = Rc::new(Cell::new(0));
3309 let tq2_count = Rc::new(Cell::new(0));
3310 let now = Instant::now();
3311
3312 let t1 = crate::spawn_local_into(
3313 enclose! { (tq1_count, now) async move {
3314 while now.elapsed().as_secs() < 5 {
3315 $work;
3316 tq1_count.replace(tq1_count.get() + 1);
3317 }
3318 }},
3319 tq1,
3320 )
3321 .unwrap();
3322
3323 let t2 = crate::spawn_local_into(
3324 enclose! { (tq2_count, now ) async move {
3325 while now.elapsed().as_secs() < 5 {
3326 $work;
3327 tq2_count.replace(tq2_count.get() + 1);
3328 }
3329 }},
3330 tq2,
3331 )
3332 .unwrap();
3333
3334 join!(t1, t2);
3335
3336 let expected_ratio = $s2 as f64 / (($s2 + $s1) as f64);
3337 let actual_ratio =
3338 tq2_count.get() as f64 / ((tq1_count.get() + tq2_count.get()) as f64);
3339
3340 // Be gentle: we don't know if we're running against other threads, under which
3341 // conditions, etc
3342 assert!((expected_ratio - actual_ratio).abs() < 0.1);
3343 });
3344 };
3345 }
3346
3347 #[test]
3348 fn test_shares_high_disparity_fat_task() {
3349 test_static_shares!(1000, 10, { work_quanta().await });
3350 }
3351
3352 #[test]
3353 fn test_shares_low_disparity_fat_task() {
3354 test_static_shares!(1000, 1000, { work_quanta().await });
3355 }
3356
3357 #[test]
3358 fn test_allocate_dma_buffer() {
3359 LocalExecutor::default().run(async {
3360 let mut buffer = crate::allocate_dma_buffer(42);
3361 assert_eq!(buffer.len(), 42);
3362 buffer.as_bytes_mut()[0] = 12;
3363 buffer.as_bytes_mut()[12] = 13;
3364 assert_eq!(buffer.as_bytes_mut().len(), 42);
3365 assert_eq!(buffer.as_bytes()[0], 12);
3366 assert_eq!(buffer.as_bytes()[12], 13);
3367 });
3368 }
3369
3370 #[test]
3371 fn test_allocate_dma_buffer_global() {
3372 LocalExecutor::default().run(async {
3373 let mut buffer = crate::allocate_dma_buffer_global(42);
3374 assert_eq!(buffer.len(), 42);
3375 assert_eq!(buffer.len(), 42);
3376 buffer.as_bytes_mut()[0] = 12;
3377 buffer.as_bytes_mut()[12] = 13;
3378 assert_eq!(buffer.as_bytes_mut().len(), 42);
3379 assert_eq!(buffer.as_bytes()[0], 12);
3380 assert_eq!(buffer.as_bytes()[12], 13);
3381 });
3382 }
3383
3384 struct DynamicSharesTest {
3385 shares: Cell<usize>,
3386 }
3387
3388 impl DynamicSharesTest {
3389 fn new() -> Rc<Self> {
3390 Rc::new(Self {
3391 shares: Cell::new(0),
3392 })
3393 }
3394 fn tick(&self, millis: u64) {
3395 if millis < 1000 {
3396 self.shares.replace(1);
3397 } else {
3398 self.shares.replace(1000);
3399 }
3400 }
3401 }
3402
3403 impl SharesManager for DynamicSharesTest {
3404 fn shares(&self) -> usize {
3405 self.shares.get()
3406 }
3407
3408 fn adjustment_period(&self) -> Duration {
3409 Duration::from_millis(1)
3410 }
3411 }
3412
3413 #[test]
3414 fn test_dynamic_shares() {
3415 let local_ex = LocalExecutor::default();
3416
3417 local_ex.run(async {
3418 let bm = DynamicSharesTest::new();
3419 // Reference task queue.
3420 let tq1 = crate::executor().create_task_queue(
3421 Shares::Static(1000),
3422 Latency::Matters(Duration::from_millis(1)),
3423 "test_1",
3424 );
3425 let tq2 = crate::executor().create_task_queue(
3426 Shares::Dynamic(bm.clone()),
3427 Latency::Matters(Duration::from_millis(1)),
3428 "test_2",
3429 );
3430
3431 let tq1_count = Rc::new(RefCell::new(vec![0, 0]));
3432 let tq2_count = Rc::new(RefCell::new(vec![0, 0]));
3433 let now = Instant::now();
3434
3435 let t1 = crate::spawn_local_into(
3436 enclose! { (tq1_count, now) async move {
3437 loop {
3438 let secs = now.elapsed().as_secs();
3439 if secs >= 2 {
3440 break;
3441 }
3442 (*tq1_count.borrow_mut())[secs as usize] += 1;
3443 crate::executor().yield_task_queue_now().await;
3444 }
3445 }},
3446 tq1,
3447 )
3448 .unwrap();
3449
3450 let t2 = crate::spawn_local_into(
3451 enclose! { (tq2_count, now, bm) async move {
3452 loop {
3453 let elapsed = now.elapsed();
3454 let secs = elapsed.as_secs();
3455 if secs >= 2 {
3456 break;
3457 }
3458 bm.tick(elapsed.as_millis() as u64);
3459 (*tq2_count.borrow_mut())[secs as usize] += 1;
3460 crate::executor().yield_task_queue_now().await;
3461 }
3462 }},
3463 tq2,
3464 )
3465 .unwrap();
3466
3467 join!(t1, t2);
3468 // Keep this very simple because every new processor, every load condition, will
3469 // yield different results. All we want to validate is: for a large
3470 // part of the first two seconds shares were very low, we should
3471 // have received very low ratio. On the second half we should have
3472 // accumulated much more. Real numbers are likely much higher than
3473 // the targets, but those targets are safe.
3474 let ratios: Vec<f64> = tq1_count
3475 .borrow()
3476 .iter()
3477 .zip(tq2_count.borrow().iter())
3478 .map(|(x, y)| *y as f64 / *x as f64)
3479 .collect();
3480 assert!(ratios[1] > ratios[0]);
3481 assert!(ratios[0] < 0.25);
3482 assert!(ratios[1] > 0.50);
3483 });
3484 }
3485
3486 #[test]
3487 fn multiple_spawn() {
3488 // Issue 241
3489 LocalExecutor::default().run(async {
3490 crate::spawn_local(async {}).detach().await;
3491 // In issue 241, the presence of the second detached waiter caused
3492 // the program to hang.
3493 crate::spawn_local(async {}).detach().await;
3494 });
3495 }
3496
3497 #[test]
3498 #[should_panic(expected = "Message!")]
3499 fn panic_is_not_list() {
3500 LocalExecutor::default().run(async { panic!("Message!") });
3501 }
3502
3503 struct TestFuture {
3504 w: Arc<Mutex<Option<Waker>>>,
3505 }
3506
3507 impl Future for TestFuture {
3508 type Output = ();
3509 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3510 let mut w = self.w.lock().unwrap();
3511 match w.take() {
3512 Some(_) => Poll::Ready(()),
3513 None => {
3514 *w = Some(cx.waker().clone());
3515 Poll::Pending
3516 }
3517 }
3518 }
3519 }
3520
3521 #[test]
3522 fn cross_executor_wake_by_ref() {
3523 let w = Arc::new(Mutex::new(None));
3524 let t = w.clone();
3525
3526 let fut = TestFuture { w };
3527
3528 let ex1 = LocalExecutorBuilder::default()
3529 .spawn(|| async move {
3530 fut.await;
3531 })
3532 .unwrap();
3533
3534 let ex2 = LocalExecutorBuilder::default()
3535 .spawn(|| async move {
3536 loop {
3537 sleep(Duration::from_secs(1)).await;
3538 let w = t.lock().unwrap();
3539 if let Some(ref x) = *w {
3540 x.wake_by_ref();
3541 return;
3542 }
3543 }
3544 })
3545 .unwrap();
3546
3547 ex1.join().unwrap();
3548 ex2.join().unwrap();
3549 }
3550
3551 #[test]
3552 fn cross_executor_wake_by_value() {
3553 let w = Arc::new(Mutex::new(None));
3554 let t = w.clone();
3555
3556 let fut = TestFuture { w };
3557
3558 let ex1 = LocalExecutorBuilder::default()
3559 .spawn(|| async move {
3560 fut.await;
3561 })
3562 .unwrap();
3563
3564 let ex2 = LocalExecutorBuilder::default()
3565 .spawn(|| async move {
3566 loop {
3567 sleep(Duration::from_secs(1)).await;
3568 let w = t.lock().unwrap();
3569 if let Some(x) = w.clone() {
3570 x.wake();
3571 return;
3572 }
3573 }
3574 })
3575 .unwrap();
3576
3577 ex1.join().unwrap();
3578 ex2.join().unwrap();
3579 }
3580
3581 // Wakes up the waker in a remote executor
3582 #[test]
3583 fn cross_executor_wake_with_join_handle() {
3584 let w = Arc::new(Mutex::new(None));
3585 let t = w.clone();
3586
3587 let fut = TestFuture { w };
3588
3589 let ex1 = LocalExecutorBuilder::default()
3590 .spawn(|| async move {
3591 let x = crate::spawn_local(fut).detach();
3592 x.await;
3593 })
3594 .unwrap();
3595
3596 let ex2 = LocalExecutorBuilder::default()
3597 .spawn(|| async move {
3598 loop {
3599 sleep(Duration::from_secs(1)).await;
3600 let w = t.lock().unwrap();
3601 if let Some(x) = w.clone() {
3602 x.wake();
3603 return;
3604 }
3605 }
3606 })
3607 .unwrap();
3608
3609 ex1.join().unwrap();
3610 ex2.join().unwrap();
3611 }
3612
3613 // The other side won't be alive to get the notification. We should still
3614 // survive.
3615 #[test]
3616 fn cross_executor_wake_early_drop() {
3617 let w = Arc::new(Mutex::new(None));
3618 let t = w.clone();
3619
3620 let fut = TestFuture { w };
3621
3622 let ex1 = LocalExecutorBuilder::default()
3623 .spawn(|| async move {
3624 let _drop = futures_lite::future::poll_once(fut).await;
3625 })
3626 .unwrap();
3627
3628 let ex2 = LocalExecutorBuilder::default()
3629 .spawn(|| async move {
3630 loop {
3631 sleep(Duration::from_secs(1)).await;
3632 let w = t.lock().unwrap();
3633 if let Some(ref x) = *w {
3634 x.wake_by_ref();
3635 return;
3636 }
3637 }
3638 })
3639 .unwrap();
3640
3641 ex1.join().unwrap();
3642 ex2.join().unwrap();
3643 }
3644
3645 // The other side won't be alive to get the notification and even worse, we hold
3646 // a waker that we notify after the first executor is surely dead. We should
3647 // still survive.
3648 #[test]
3649 fn cross_executor_wake_hold_waker() {
3650 let w = Arc::new(Mutex::new(None));
3651 let t = w.clone();
3652
3653 let fut = TestFuture { w };
3654
3655 let ex1 = LocalExecutorBuilder::default()
3656 .spawn(|| async move {
3657 let _drop = futures_lite::future::poll_once(fut).await;
3658 })
3659 .unwrap();
3660 ex1.join().unwrap();
3661
3662 let ex2 = LocalExecutorBuilder::default()
3663 .spawn(|| async move {
3664 let w = t.lock().unwrap().clone().unwrap();
3665 w.wake_by_ref();
3666 })
3667 .unwrap();
3668
3669 ex2.join().unwrap();
3670 }
3671
3672 #[test]
3673 fn executor_pool_builder() {
3674 let nr_cpus = 4;
3675
3676 let count = Arc::new(AtomicUsize::new(0));
3677 let handles = LocalExecutorPoolBuilder::new(PoolPlacement::Unbound(nr_cpus))
3678 .on_all_shards({
3679 let count = Arc::clone(&count);
3680 || async move { count.fetch_add(1, Ordering::Relaxed) }
3681 })
3682 .unwrap();
3683
3684 let _: std::thread::ThreadId = handles.handles[0].thread().id();
3685
3686 assert_eq!(nr_cpus, handles.handles().iter().count());
3687
3688 let mut fut_output = handles
3689 .join_all()
3690 .into_iter()
3691 .map(Result::unwrap)
3692 .collect::<Vec<_>>();
3693
3694 fut_output.sort_unstable();
3695
3696 assert_eq!(fut_output, (0..nr_cpus).collect::<Vec<_>>());
3697
3698 assert_eq!(nr_cpus, count.load(Ordering::Relaxed));
3699 }
3700
3701 #[test]
3702 fn executor_invalid_executor_count() {
3703 assert!(LocalExecutorPoolBuilder::new(PoolPlacement::Unbound(0))
3704 .on_all_shards(|| async move {})
3705 .is_err());
3706 }
3707
3708 #[test]
3709 fn executor_pool_builder_placements() {
3710 let cpu_set = CpuSet::online().unwrap();
3711 assert!(!cpu_set.is_empty());
3712
3713 for nn in 1..2 {
3714 let nr_execs = nn * cpu_set.len();
3715 let mut placements = vec![
3716 PoolPlacement::Unbound(nr_execs),
3717 PoolPlacement::Fenced(nr_execs, cpu_set.clone()),
3718 PoolPlacement::MaxSpread(nr_execs, None),
3719 PoolPlacement::MaxSpread(nr_execs, Some(cpu_set.clone())),
3720 PoolPlacement::MaxPack(nr_execs, None),
3721 PoolPlacement::MaxPack(nr_execs, Some(cpu_set.clone())),
3722 ];
3723
3724 for pp in placements.drain(..) {
3725 let ids = Arc::new(Mutex::new(HashMap::new()));
3726 let cpus = Arc::new(Mutex::new(HashMap::new()));
3727 let cpu_hard_bind =
3728 !matches!(pp, PoolPlacement::Unbound(_) | PoolPlacement::Fenced(_, _));
3729
3730 let handles = LocalExecutorPoolBuilder::new(pp)
3731 .on_all_shards({
3732 let ids = Arc::clone(&ids);
3733 let cpus = Arc::clone(&cpus);
3734 || async move {
3735 ids.lock()
3736 .unwrap()
3737 .entry(crate::executor().id())
3738 .and_modify(|e| *e += 1)
3739 .or_insert(1);
3740
3741 let pid = nix::unistd::Pid::from_raw(0);
3742 let cpu = nix::sched::sched_getaffinity(pid).unwrap();
3743 cpus.lock()
3744 .unwrap()
3745 .entry(cpu)
3746 .and_modify(|e| *e += 1)
3747 .or_insert(1);
3748 }
3749 })
3750 .unwrap();
3751
3752 assert_eq!(nr_execs, handles.handles().len());
3753 handles
3754 .join_all()
3755 .into_iter()
3756 .for_each(|r| assert!(r.is_ok()));
3757
3758 assert_eq!(nr_execs, ids.lock().unwrap().len());
3759 ids.lock().unwrap().values().for_each(|v| assert_eq!(*v, 1));
3760
3761 if cpu_hard_bind {
3762 assert_eq!(nr_execs, cpus.lock().unwrap().len());
3763 cpus.lock()
3764 .unwrap()
3765 .values()
3766 .for_each(|v| assert_eq!(*v, nn));
3767 }
3768 }
3769 }
3770 }
3771
3772 #[test]
3773 fn executor_pool_builder_shards_limit() {
3774 let cpu_set = CpuSet::online().unwrap();
3775 assert!(!cpu_set.is_empty());
3776
3777 // test: confirm that we can always get shards up to the # of cpus
3778 {
3779 let mut placements = vec![
3780 (false, PoolPlacement::Unbound(cpu_set.len())),
3781 (false, PoolPlacement::Fenced(cpu_set.len(), cpu_set.clone())),
3782 (true, PoolPlacement::MaxSpread(cpu_set.len(), None)),
3783 (
3784 true,
3785 PoolPlacement::MaxSpread(cpu_set.len(), Some(cpu_set.clone())),
3786 ),
3787 (true, PoolPlacement::MaxPack(cpu_set.len(), None)),
3788 (
3789 true,
3790 PoolPlacement::MaxPack(cpu_set.len(), Some(cpu_set.clone())),
3791 ),
3792 ];
3793
3794 for (_shard_limited, p) in placements.drain(..) {
3795 LocalExecutorPoolBuilder::new(p)
3796 .on_all_shards(|| async move {})
3797 .unwrap()
3798 .join_all();
3799 }
3800 }
3801
3802 // test: confirm that some placements fail when shards are # of cpus + 1
3803 {
3804 let mut placements = vec![
3805 (false, PoolPlacement::Unbound(1 + cpu_set.len())),
3806 (
3807 false,
3808 PoolPlacement::Fenced(1 + cpu_set.len(), cpu_set.clone()),
3809 ),
3810 (true, PoolPlacement::MaxSpread(1 + cpu_set.len(), None)),
3811 (
3812 true,
3813 PoolPlacement::MaxSpread(1 + cpu_set.len(), Some(cpu_set.clone())),
3814 ),
3815 (true, PoolPlacement::MaxPack(1 + cpu_set.len(), None)),
3816 (
3817 true,
3818 PoolPlacement::MaxPack(1 + cpu_set.len(), Some(cpu_set)),
3819 ),
3820 ];
3821
3822 for (shard_limited, p) in placements.drain(..) {
3823 match LocalExecutorPoolBuilder::new(p).on_all_shards(|| async move {}) {
3824 Ok(handles) => {
3825 handles.join_all();
3826 assert!(!shard_limited);
3827 }
3828 Err(_) => assert!(shard_limited),
3829 }
3830 }
3831 }
3832 }
3833
3834 #[test]
3835 fn scoped_task() {
3836 LocalExecutor::default().run(async {
3837 let mut a = 1;
3838 unsafe {
3839 crate::spawn_scoped_local(async {
3840 a = 2;
3841 })
3842 .await;
3843 }
3844 crate::executor().yield_task_queue_now().await;
3845 assert_eq!(a, 2);
3846
3847 let mut a = 1;
3848 let do_later = unsafe {
3849 crate::spawn_scoped_local(async {
3850 a = 2;
3851 })
3852 };
3853
3854 crate::executor().yield_task_queue_now().await;
3855 do_later.await;
3856 assert_eq!(a, 2);
3857 });
3858 }
3859
3860 #[test]
3861 fn executor_pool_builder_thread_panic() {
3862 let nr_execs = 8;
3863 let res = LocalExecutorPoolBuilder::new(PoolPlacement::Unbound(nr_execs))
3864 .on_all_shards(|| async move { panic!("join handle will be Err") })
3865 .unwrap()
3866 .join_all();
3867
3868 assert_eq!(nr_execs, res.len());
3869 assert!(res.into_iter().all(|r| r.is_err()));
3870 }
3871
3872 #[test]
3873 fn executor_pool_builder_return_values() {
3874 let nr_execs = 8;
3875 let x = Arc::new(AtomicUsize::new(0));
3876 let mut values = LocalExecutorPoolBuilder::new(PoolPlacement::Unbound(nr_execs))
3877 .on_all_shards(|| async move { x.fetch_add(1, Ordering::Relaxed) })
3878 .unwrap()
3879 .join_all()
3880 .into_iter()
3881 .map(Result::unwrap)
3882 .collect::<Vec<_>>();
3883
3884 values.sort_unstable();
3885 assert_eq!(values, (0..nr_execs).collect::<Vec<_>>());
3886 }
3887
3888 #[test]
3889 fn executor_pool_builder_spawn_cancel() {
3890 let nr_shards = 8;
3891 let builder = LocalExecutorPoolBuilder::new(PoolPlacement::Unbound(nr_shards));
3892 let nr_exectuted = Arc::new(AtomicUsize::new(0));
3893
3894 let fut_gen = {
3895 let nr_exectuted = Arc::clone(&nr_exectuted);
3896 || async move {
3897 nr_exectuted.fetch_add(1, Ordering::Relaxed);
3898 unreachable!("should not execute")
3899 }
3900 };
3901
3902 let mut handles = PoolThreadHandles::new();
3903 let mut cpu_set_gen = placement::CpuSetGenerator::pool(builder.placement.clone()).unwrap();
3904 let latch = Latch::new(builder.placement.executor_count());
3905
3906 let ii_cxl = 2;
3907 for ii in 0..builder.placement.executor_count() {
3908 if ii == nr_shards - ii_cxl {
3909 std::thread::sleep(std::time::Duration::from_millis(100));
3910 assert!(ii_cxl <= latch.cancel().unwrap());
3911 }
3912 match builder.spawn_thread(&mut cpu_set_gen, &latch, fut_gen.clone()) {
3913 Ok(handle) => handles.push(handle),
3914 Err(_) => break,
3915 }
3916 }
3917
3918 assert_eq!(0, nr_exectuted.load(Ordering::Relaxed));
3919 assert_eq!(nr_shards, handles.handles.len());
3920 handles.join_all().into_iter().for_each(|s| {
3921 assert!(format!("{}", s.unwrap_err()).contains("spawn failed"));
3922 });
3923 }
3924
3925 #[should_panic]
3926 #[test]
3927 fn executor_inception() {
3928 LocalExecutor::default().run(async {
3929 LocalExecutor::default().run(async {});
3930 });
3931 }
3932
3933 enum TaskState {
3934 Pending(Option<Waker>),
3935 Ready,
3936 }
3937
3938 // following four tests are regression ones for https://github.com/DataDog/glommio/issues/379.
3939 // here we test against task reference count underflow
3940 // test includes two scenarios, with join handles and with sleep, for each case
3941 // we test both, wake and wake_by_ref
3942 #[test]
3943 fn wake_by_ref_refcount_underflow_with_join_handle() {
3944 LocalExecutor::default().run(async {
3945 let slot: Rc<RefCell<TaskState>> = Rc::new(RefCell::new(TaskState::Pending(None)));
3946 let cloned_slot = slot.clone();
3947 let jh = crate::spawn_local(async move {
3948 // first task, places waker of self into slot, when polled checks for result, if
3949 // it's ready, returns Ready, otherwise return Pending
3950 poll_fn::<(), _>(|cx| {
3951 let current = &mut *cloned_slot.borrow_mut();
3952 match current {
3953 TaskState::Pending(maybe_waker) => match maybe_waker {
3954 Some(_) => unreachable!(),
3955 None => {
3956 *current = TaskState::Pending(Some(cx.waker().clone()));
3957 Poll::Pending
3958 }
3959 },
3960 TaskState::Ready => Poll::Ready(()),
3961 }
3962 })
3963 .await;
3964 })
3965 .detach();
3966 let jh2 = crate::spawn_local(async move {
3967 // second task, checks slot for first task waker, wakes it by ref, and then it
3968 // is dropped.
3969 let current = &mut *slot.borrow_mut();
3970 match current {
3971 TaskState::Pending(maybe_waker) => {
3972 let waker = maybe_waker.take().unwrap();
3973 waker.wake_by_ref();
3974 *current = TaskState::Ready; // <-- waker dropped here, refcount is zero
3975 }
3976 TaskState::Ready => unreachable!(), // task cannot be ready at this time
3977 }
3978 })
3979 .detach();
3980 join_all(vec![jh, jh2]).await;
3981 });
3982 }
3983
3984 #[test]
3985 fn wake_by_ref_refcount_underflow_with_sleep() {
3986 LocalExecutor::default().run(async {
3987 let slot: Rc<RefCell<TaskState>> = Rc::new(RefCell::new(TaskState::Pending(None)));
3988 let cloned_slot = slot.clone();
3989 crate::spawn_local(async move {
3990 poll_fn::<(), _>(|cx| {
3991 let current = &mut *cloned_slot.borrow_mut();
3992 match current {
3993 TaskState::Pending(maybe_waker) => match maybe_waker {
3994 Some(_) => unreachable!(),
3995 None => {
3996 *current = TaskState::Pending(Some(cx.waker().clone()));
3997 Poll::Pending
3998 }
3999 },
4000 TaskState::Ready => Poll::Ready(()),
4001 }
4002 })
4003 .await;
4004 })
4005 .detach();
4006 crate::spawn_local(async move {
4007 let current = &mut *slot.borrow_mut();
4008 match current {
4009 TaskState::Pending(maybe_waker) => {
4010 let waker = maybe_waker.take().unwrap();
4011 waker.wake_by_ref();
4012 *current = TaskState::Ready;
4013 }
4014 TaskState::Ready => unreachable!(),
4015 }
4016 })
4017 .detach();
4018 timer::sleep(Duration::from_millis(1)).await;
4019 });
4020 }
4021
4022 #[test]
4023 fn wake_refcount_underflow_with_join_handle() {
4024 LocalExecutor::default().run(async {
4025 let slot: Rc<RefCell<TaskState>> = Rc::new(RefCell::new(TaskState::Pending(None)));
4026 let cloned_slot = slot.clone();
4027 let jh = crate::spawn_local(async move {
4028 poll_fn::<(), _>(|cx| {
4029 let current = &mut *cloned_slot.borrow_mut();
4030 match current {
4031 TaskState::Pending(maybe_waker) => match maybe_waker {
4032 Some(_) => unreachable!(),
4033 None => {
4034 *current = TaskState::Pending(Some(cx.waker().clone()));
4035 Poll::Pending
4036 }
4037 },
4038 TaskState::Ready => Poll::Ready(()),
4039 }
4040 })
4041 .await;
4042 })
4043 .detach();
4044 let jh2 = crate::spawn_local(async move {
4045 let current = &mut *slot.borrow_mut();
4046 match current {
4047 TaskState::Pending(maybe_waker) => {
4048 let waker = maybe_waker.take().unwrap();
4049 waker.wake();
4050 *current = TaskState::Ready;
4051 }
4052 TaskState::Ready => unreachable!(),
4053 }
4054 })
4055 .detach();
4056 join_all(vec![jh, jh2]).await;
4057 });
4058 }
4059
4060 #[test]
4061 fn wake_refcount_underflow_with_sleep() {
4062 LocalExecutor::default().run(async {
4063 let slot: Rc<RefCell<TaskState>> = Rc::new(RefCell::new(TaskState::Pending(None)));
4064 let cloned_slot = slot.clone();
4065 crate::spawn_local(async move {
4066 poll_fn::<(), _>(|cx| {
4067 let current = &mut *cloned_slot.borrow_mut();
4068 match current {
4069 TaskState::Pending(maybe_waker) => match maybe_waker {
4070 Some(_) => unreachable!(),
4071 None => {
4072 *current = TaskState::Pending(Some(cx.waker().clone()));
4073 Poll::Pending
4074 }
4075 },
4076 TaskState::Ready => Poll::Ready(()),
4077 }
4078 })
4079 .await;
4080 })
4081 .detach();
4082 crate::spawn_local(async move {
4083 let current = &mut *slot.borrow_mut();
4084 match current {
4085 TaskState::Pending(maybe_waker) => {
4086 let waker = maybe_waker.take().unwrap();
4087 waker.wake();
4088 *current = TaskState::Ready;
4089 }
4090 TaskState::Ready => unreachable!(),
4091 }
4092 })
4093 .detach();
4094 timer::sleep(Duration::from_millis(1)).await;
4095 });
4096 }
4097
4098 #[test]
4099 fn blocking_function() {
4100 LocalExecutor::default().run(async {
4101 let started = Instant::now();
4102
4103 let blocking = executor().spawn_blocking(enclose!((started) move || {
4104 let now = Instant::now();
4105 while now.elapsed() < Duration::from_millis(100) {}
4106 started.elapsed()
4107 }));
4108 let coop = enclose!((started) async move {
4109 let now = Instant::now();
4110 while now.elapsed() < Duration::from_millis(100) {
4111 yield_if_needed().await;
4112 }
4113 started.elapsed()
4114 });
4115
4116 let (blocking, coop) = join(blocking, coop).await;
4117
4118 assert!(blocking.as_millis() >= 100 && blocking.as_millis() < 150);
4119 assert!(coop.as_millis() >= 100 && coop.as_millis() < 150);
4120 });
4121 }
4122
4123 #[test]
4124 fn blocking_function_parallelism() {
4125 LocalExecutorBuilder::new(Placement::Unbound)
4126 .blocking_thread_pool_placement(PoolPlacement::Unbound(4))
4127 .spawn(|| async {
4128 let started = Instant::now();
4129 let mut blocking = vec![];
4130
4131 for _ in 0..5 {
4132 blocking.push(executor().spawn_blocking(enclose!((started) move || {
4133 let now = Instant::now();
4134 while now.elapsed() < Duration::from_millis(100) {}
4135 started.elapsed()
4136 })));
4137 }
4138
4139 // we created 5 blocking jobs each taking 100ms but our thread pool only has 4
4140 // threads. We expect one of those jobs to take twice as long as the others.
4141
4142 let mut ts = join_all(blocking.into_iter()).await;
4143 assert_eq!(ts.len(), 5);
4144
4145 ts.sort_unstable();
4146 for ts in ts.iter().take(4) {
4147 assert!(ts.as_millis() >= 100 && ts.as_millis() < 150);
4148 }
4149 assert!(ts[4].as_millis() >= 200 && ts[4].as_millis() < 250);
4150 })
4151 .unwrap()
4152 .join()
4153 .unwrap();
4154 }
4155
4156 #[test]
4157 fn blocking_function_placement_independent_of_executor_placement() {
4158 let affinity = nix::sched::sched_getaffinity(nix::unistd::Pid::from_raw(0)).unwrap();
4159 let num_cpus_accessible_by_default = (0..nix::sched::CpuSet::count())
4160 .map(|cpu| affinity.is_set(cpu).unwrap() as usize)
4161 .sum::<usize>();
4162 if num_cpus_accessible_by_default < 2 {
4163 eprintln!(
4164 "Insufficient CPUs available to test blocking_function_placement_independent_of_executor_placement (affinity only allows for {})",
4165 num_cpus_accessible_by_default,
4166 );
4167 return;
4168 }
4169
4170 let num_schedulable_cpus = LocalExecutorBuilder::new(Placement::Fixed(0))
4171 .blocking_thread_pool_placement(PoolPlacement::Unbound(2))
4172 .spawn(|| async {
4173 executor()
4174 .spawn_blocking(move || {
4175 let pid = nix::unistd::Pid::from_raw(0);
4176 let affinity =
4177 nix::sched::sched_getaffinity(pid).expect("Failed to get affinity");
4178 (0..nix::sched::CpuSet::count())
4179 .map(|cpu| {
4180 affinity
4181 .is_set(cpu)
4182 .expect("Failed to check if cpu affinity is set")
4183 as usize
4184 })
4185 .sum::<usize>()
4186 })
4187 .await
4188 })
4189 .unwrap()
4190 .join()
4191 .unwrap();
4192
4193 assert!(
4194 num_schedulable_cpus >= num_cpus_accessible_by_default,
4195 "num schedulable {}, num cpus accessible {}",
4196 num_schedulable_cpus,
4197 num_cpus_accessible_by_default,
4198 );
4199 }
4200
4201 #[test]
4202 fn blocking_pool_invalid_placement() {
4203 let ret = LocalExecutorBuilder::new(Placement::Unbound)
4204 .blocking_thread_pool_placement(PoolPlacement::Unbound(0))
4205 .spawn(|| async {})
4206 .unwrap()
4207 .join();
4208 assert!(ret.is_err());
4209 }
4210
4211 #[test]
4212 fn local_executor_unset() {
4213 LocalExecutor::default().run(async {});
4214
4215 #[cfg(not(feature = "native-tls"))]
4216 assert!(!LOCAL_EX.is_set());
4217
4218 #[cfg(feature = "native-tls")]
4219 assert!(unsafe { LOCAL_EX.is_null() });
4220 }
4221
4222 #[test]
4223 fn local_executor_unset_when_panic() {
4224 let res = std::panic::catch_unwind(|| {
4225 LocalExecutor::default().run(async {
4226 panic!("uh oh!");
4227 });
4228 });
4229 assert!(res.is_err());
4230
4231 #[cfg(not(feature = "native-tls"))]
4232 assert!(!LOCAL_EX.is_set());
4233
4234 #[cfg(feature = "native-tls")]
4235 assert!(unsafe { LOCAL_EX.is_null() });
4236 }
4237}