glommio/executor/
mod.rs

1// Unless explicitly stated otherwise all files in this repository are licensed
2// under the MIT/Apache-2.0 License, at your convenience
3//
4// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2020 Datadog, Inc.
5//
6//! Async executor.
7//!
8//! This crate offers two kinds of executors: single-threaded and
9//! multi-threaded.
10//!
11//! # Examples
12//!
13//! Run four single-threaded executors concurrently:
14//!
15//! ```
16//! use glommio::{
17//!     timer::Timer,
18//!     LocalExecutor,
19//!     LocalExecutorBuilder,
20//!     LocalExecutorPoolBuilder,
21//!     PoolPlacement,
22//! };
23//!
24//! LocalExecutorPoolBuilder::new(PoolPlacement::Unbound(4))
25//!     .on_all_shards(move || async {
26//!         Timer::new(std::time::Duration::from_millis(100)).await;
27//!         println!("Hello world!");
28//!     })
29//!     .expect("failed to spawn local executors")
30//!     .join_all();
31//! ```
32
33#![warn(missing_docs, missing_debug_implementations)]
34
35use crate::{
36    error::BuilderErrorKind,
37    executor::stall::StallDetector,
38    io::DmaBuffer,
39    parking, reactor,
40    sys::{self, blocking::BlockingThreadPool},
41    task::{self, waker_fn::dummy_waker},
42    GlommioError, IoRequirements, IoStats, Latency, Reactor, Shares,
43};
44use ahash::AHashMap;
45use futures_lite::pin;
46use latch::{Latch, LatchState};
47use log::warn;
48pub use placement::{CpuSet, Placement, PoolPlacement};
49use std::{
50    cell::RefCell,
51    collections::{hash_map::Entry, BinaryHeap},
52    fmt,
53    future::Future,
54    io,
55    marker::PhantomData,
56    mem::MaybeUninit,
57    ops::{Deref, DerefMut},
58    pin::Pin,
59    rc::Rc,
60    sync::{Arc, Mutex},
61    task::{Context, Poll},
62    thread::{Builder, JoinHandle},
63    time::{Duration, Instant},
64};
65use tracing::trace;
66
67mod latch;
68mod multitask;
69mod placement;
70pub mod stall;
71
72pub(crate) const DEFAULT_EXECUTOR_NAME: &str = "unnamed";
73pub(crate) const DEFAULT_PREEMPT_TIMER: Duration = Duration::from_millis(100);
74pub(crate) const DEFAULT_IO_MEMORY: usize = 10 << 20;
75pub(crate) const DEFAULT_RING_SUBMISSION_DEPTH: usize = 128;
76
77/// Result type alias that removes the need to specify a type parameter
78/// that's only valid in the channel variants of the error. Otherwise, it
79/// might be confused with the error (`E`) that a result usually has in
80/// the second type parameter.
81type Result<T> = crate::Result<T, ()>;
82
83#[cfg(feature = "native-tls")]
84#[thread_local]
85static mut LOCAL_EX: *const LocalExecutor = std::ptr::null();
86
87#[cfg(not(feature = "native-tls"))]
88scoped_tls::scoped_thread_local!(static LOCAL_EX: LocalExecutor);
89
90/// Returns a proxy struct to the [`LocalExecutor`]
91#[inline(always)]
92pub fn executor() -> ExecutorProxy {
93    ExecutorProxy {}
94}
95
96pub(crate) fn executor_id() -> Option<usize> {
97    #[cfg(not(feature = "native-tls"))]
98    {
99        if LOCAL_EX.is_set() {
100            Some(LOCAL_EX.with(|ex| ex.id))
101        } else {
102            None
103        }
104    }
105
106    #[cfg(feature = "native-tls")]
107    unsafe {
108        LOCAL_EX.as_ref().map(|ex| ex.id)
109    }
110}
111
112#[derive(Default, Debug, Copy, Clone, Eq, PartialEq, Hash)]
113/// An opaque handle indicating in which queue a group of tasks will execute.
114/// Tasks in the same group will execute in FIFO order but no guarantee is made
115/// about ordering on different task queues.
116pub struct TaskQueueHandle {
117    index: usize,
118}
119
120impl TaskQueueHandle {
121    /// Returns a numeric ID that uniquely identifies this Task queue
122    pub fn index(&self) -> usize {
123        self.index
124    }
125}
126
127#[derive(Debug)]
128pub(crate) struct TaskQueue {
129    pub(crate) ex: Rc<multitask::LocalExecutor>,
130    active: bool,
131    shares: Shares,
132    vruntime: u64,
133    io_requirements: IoRequirements,
134    name: String,
135    last_adjustment: Instant,
136    // for dynamic shares classes
137    yielded: bool,
138    stats: TaskQueueStats,
139}
140
141// Impl a custom order so we use a min-heap
142impl Ord for TaskQueue {
143    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
144        other.vruntime.cmp(&self.vruntime)
145    }
146}
147
148impl PartialOrd for TaskQueue {
149    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
150        Some(other.vruntime.cmp(&self.vruntime))
151    }
152}
153
154impl PartialEq for TaskQueue {
155    fn eq(&self, other: &Self) -> bool {
156        self.vruntime == other.vruntime
157    }
158}
159
160impl Eq for TaskQueue {}
161
162impl TaskQueue {
163    fn new<S>(
164        index: TaskQueueHandle,
165        name: S,
166        shares: Shares,
167        ioreq: IoRequirements,
168    ) -> Rc<RefCell<Self>>
169    where
170        S: Into<String>,
171    {
172        Rc::new(RefCell::new(TaskQueue {
173            ex: Rc::new(multitask::LocalExecutor::new()),
174            active: false,
175            stats: TaskQueueStats::new(index, shares.reciprocal_shares()),
176            shares,
177            vruntime: 0,
178            io_requirements: ioreq,
179            name: name.into(),
180            last_adjustment: Instant::now(),
181            yielded: false,
182        }))
183    }
184
185    fn is_active(&self) -> bool {
186        self.active
187    }
188
189    fn get_task(&mut self) -> Option<multitask::Runnable> {
190        self.ex.get_task()
191    }
192
193    fn yielded(&self) -> bool {
194        self.yielded
195    }
196
197    fn prepare_to_run(&mut self, now: Instant) {
198        self.yielded = false;
199        if let Shares::Dynamic(bm) = &self.shares {
200            if now.saturating_duration_since(self.last_adjustment) > bm.adjustment_period() {
201                self.last_adjustment = now;
202                self.stats.reciprocal_shares = self.shares.reciprocal_shares();
203            }
204        }
205    }
206
207    fn account_vruntime(&mut self, delta: Duration) -> Option<u64> {
208        let delta_scaled = (self.stats.reciprocal_shares * (delta.as_nanos() as u64)) >> 12;
209        self.stats.runtime += delta;
210        self.stats.queue_selected += 1;
211        self.active = self.ex.is_active();
212
213        let vruntime = self.vruntime.checked_add(delta_scaled);
214        if let Some(x) = vruntime {
215            self.vruntime = x;
216        }
217        vruntime
218    }
219}
220
221pub(crate) fn bind_to_cpu_set(cpus: impl IntoIterator<Item = usize>) -> Result<()> {
222    let mut cpuset = nix::sched::CpuSet::new();
223    for cpu in cpus {
224        cpuset.set(cpu).map_err(|e| to_io_error!(e))?;
225    }
226    let pid = nix::unistd::Pid::from_raw(0);
227    nix::sched::sched_setaffinity(pid, &cpuset).map_err(|e| Into::into(to_io_error!(e)))
228}
229
230// Dealing with references would imply getting a Rc, RefCells, and all of that
231// Stats should be copied Infrequently, and if you have enough stats to fill a
232// Kb with data from a single source, maybe you should rethink your life
233// choices.
234#[derive(Debug, Copy, Clone, Default)]
235/// Allows information about the current state of this executor to be consumed
236/// by applications.
237pub struct ExecutorStats {
238    executor_runtime: Duration,
239    // total_runtime include poll_io time, exclude spin loop time
240    total_runtime: Duration,
241    scheduler_runs: u64,
242    tasks_executed: u64,
243}
244
245impl ExecutorStats {
246    fn new() -> Self {
247        Self {
248            executor_runtime: Duration::from_nanos(0),
249            total_runtime: Duration::from_nanos(0),
250            scheduler_runs: 0,
251            tasks_executed: 0,
252        }
253    }
254
255    /// The total amount of runtime in this executor so far.
256    ///
257    /// This is especially important for spinning executors, since the amount of
258    /// CPU time you will see in the operating system will be a far cry from
259    /// the CPU time it actually spent executing. Sleeping or Spinning are
260    /// not accounted here
261    pub fn executor_runtime(&self) -> Duration {
262        self.executor_runtime
263    }
264
265    /// The total amount of runtime in this executor, plus poll io time
266    pub fn total_runtime(&self) -> Duration {
267        self.total_runtime
268    }
269
270    /// Returns the amount of times the scheduler loop was called. Glommio
271    /// scheduler selects a task queue to run and runs many tasks in that
272    /// task queue. This number corresponds to the amount of times was
273    /// called upon to select a new queue.
274    pub fn scheduler_runs(&self) -> u64 {
275        self.scheduler_runs
276    }
277
278    /// Returns the amount of tasks executed in the system, over all queues.
279    pub fn tasks_executed(&self) -> u64 {
280        self.tasks_executed
281    }
282}
283
284#[derive(Debug, Copy, Clone)]
285/// Allows information about the current state of a particular task queue to be
286/// consumed by applications.
287pub struct TaskQueueStats {
288    index: TaskQueueHandle,
289    // so we can easily produce a handle
290    reciprocal_shares: u64,
291    queue_selected: u64,
292    runtime: Duration,
293}
294
295impl TaskQueueStats {
296    fn new(index: TaskQueueHandle, reciprocal_shares: u64) -> Self {
297        Self {
298            index,
299            reciprocal_shares,
300            runtime: Duration::from_nanos(0),
301            queue_selected: 0,
302        }
303    }
304
305    /// Returns a numeric ID that uniquely identifies this Task queue
306    pub fn index(&self) -> TaskQueueHandle {
307        self.index
308    }
309
310    /// Returns the current number of shares in this task queue.
311    ///
312    /// If the task queue is configured to use static shares this will never
313    /// change. If the task queue is configured to use dynamic shares, this
314    /// returns a sample of the shares values the last time the scheduler
315    /// ran.
316    pub fn current_shares(&self) -> usize {
317        ((1u64 << 22) / self.reciprocal_shares) as usize
318    }
319
320    /// Returns the accumulated runtime this task queue had received since the
321    /// beginning of its execution
322    pub fn runtime(&self) -> Duration {
323        self.runtime
324    }
325
326    /// Returns the number of times this queue was selected to be executed. In
327    /// conjunction with the runtime, you can extract an average of the
328    /// amount of time this queue tends to run for
329    pub fn queue_selected(&self) -> u64 {
330        self.queue_selected
331    }
332
333    pub(crate) fn take(&mut self) -> Self {
334        std::mem::replace(
335            self,
336            Self {
337                index: self.index,
338                reciprocal_shares: self.reciprocal_shares,
339                queue_selected: Default::default(),
340                runtime: Default::default(),
341            },
342        )
343    }
344}
345
346#[derive(Debug)]
347struct ExecutorQueues {
348    active_executors: BinaryHeap<Rc<RefCell<TaskQueue>>>,
349    available_executors: AHashMap<usize, Rc<RefCell<TaskQueue>>>,
350    active_executing: Option<Rc<RefCell<TaskQueue>>>,
351    executor_index: usize,
352    default_vruntime: u64,
353    preempt_timer_duration: Duration,
354    default_preempt_timer_duration: Duration,
355    spin_before_park: Option<Duration>,
356    stats: ExecutorStats,
357}
358
359impl ExecutorQueues {
360    fn new(preempt_timer_duration: Duration, spin_before_park: Option<Duration>) -> Self {
361        ExecutorQueues {
362            active_executors: BinaryHeap::new(),
363            available_executors: AHashMap::new(),
364            active_executing: None,
365            executor_index: 1, // 0 is the default
366            default_vruntime: 0,
367            preempt_timer_duration,
368            default_preempt_timer_duration: preempt_timer_duration,
369            spin_before_park,
370            stats: ExecutorStats::new(),
371        }
372    }
373
374    fn reevaluate_preempt_timer(&mut self) {
375        self.preempt_timer_duration = self
376            .active_executors
377            .iter()
378            .map(|tq| match tq.borrow().io_requirements.latency_req {
379                Latency::NotImportant => self.default_preempt_timer_duration,
380                Latency::Matters(d) => d,
381            })
382            .min()
383            .unwrap_or(self.default_preempt_timer_duration)
384    }
385
386    fn maybe_activate(&mut self, queue: Rc<RefCell<TaskQueue>>) {
387        let mut state = queue.borrow_mut();
388        if !state.is_active() {
389            state.vruntime = self.default_vruntime + 1;
390            state.active = true;
391            drop(state);
392            self.active_executors.push(queue);
393            self.reevaluate_preempt_timer();
394        }
395    }
396}
397
398/// A wrapper around a [`std::thread::JoinHandle`]
399#[derive(Debug)]
400pub struct ExecutorJoinHandle<T: Send + 'static>(JoinHandle<Result<T>>);
401
402impl<T: Send + 'static> ExecutorJoinHandle<T> {
403    /// See [`std::thread::JoinHandle::thread()`]
404    #[must_use]
405    pub fn thread(&self) -> &std::thread::Thread {
406        self.0.thread()
407    }
408
409    /// See [`std::thread::JoinHandle::join()`]
410    pub fn join(self) -> Result<T> {
411        match self.0.join() {
412            Err(err) => Err(GlommioError::BuilderError(BuilderErrorKind::ThreadPanic(
413                err,
414            ))),
415            Ok(Err(err)) => Err(err),
416            Ok(Ok(res)) => Ok(res),
417        }
418    }
419}
420
421/// A factory that can be used to configure and create a [`LocalExecutor`].
422///
423/// Methods can be chained on it in order to configure it.
424///
425/// The [`spawn`] method will take ownership of the builder and create a
426/// `Result` to the [`LocalExecutor`] handle with the given configuration.
427///
428/// The [`LocalExecutor::default`] free function uses a Builder with default
429/// configuration and unwraps its return value.
430///
431/// You may want to use [`LocalExecutorBuilder::spawn`] instead of
432/// [`LocalExecutor::default`], when you want to recover from a failure to
433/// launch a thread. The [`LocalExecutor::default`] function will panic where
434/// the Builder method will return a `io::Result`.
435///
436/// # Examples
437///
438/// ```
439/// use glommio::LocalExecutorBuilder;
440///
441/// let builder = LocalExecutorBuilder::default();
442/// let ex = builder.make().unwrap();
443/// ```
444///
445/// [`LocalExecutor`]: struct.LocalExecutor.html
446///
447/// [`LocalExecutor::default`]: struct.LocalExecutor.html#method.default
448///
449/// [`LocalExecutorBuilder::spawn`]:
450/// struct.LocalExecutorBuilder.html#method.spawn
451///
452/// [`spawn`]: struct.LocalExecutorBuilder.html#method.spawn
453#[derive(Debug)]
454pub struct LocalExecutorBuilder {
455    /// The placement policy for the [`LocalExecutor`] to create
456    placement: Placement,
457    /// Spin for duration before parking a reactor
458    spin_before_park: Option<Duration>,
459    /// A name for the thread-to-be (if any), for identification in panic
460    /// messages
461    name: String,
462    /// Amount of memory to reserve for storage I/O. This will be preallocated
463    /// and registered with io_uring. It is still possible to use more than
464    /// that, but it will come from the standard allocator and performance
465    /// will suffer. Defaults to 10 MiB.
466    io_memory: usize,
467    /// The depth of the IO rings to create. This influences the level of IO
468    /// concurrency. A higher ring depth allows a shard to submit a
469    /// greater number of IO requests to the kernel at once.
470    ring_depth: usize,
471    /// How often to yield to other task queues
472    preempt_timer_duration: Duration,
473    /// Whether to record the latencies of individual IO requests
474    record_io_latencies: bool,
475    /// The placement policy of the blocking thread pool
476    /// Defaults to one thread using the same placement strategy as the host
477    /// executor
478    blocking_thread_pool_placement: PoolPlacement,
479    /// Whether to detect stalls in unyielding tasks.
480    /// [`stall::DefaultStallDetectionHandler`] installs a signal handler for
481    /// [`nix::libc::SIGUSR1`], so is disabled by default.
482    detect_stalls: Option<Box<dyn stall::StallDetectionHandler + 'static>>,
483}
484
485impl LocalExecutorBuilder {
486    /// Generates the base configuration for spawning a [`LocalExecutor`], from
487    /// which configuration methods can be chained.
488    /// The method's only argument is the [`Placement`] policy by which the
489    /// [`LocalExecutor`] is bound to the machine's hardware topology. i.e.
490    /// how many and which CPUs to use.
491    pub fn new(placement: Placement) -> LocalExecutorBuilder {
492        LocalExecutorBuilder {
493            placement: placement.clone(),
494            spin_before_park: None,
495            name: String::from(DEFAULT_EXECUTOR_NAME),
496            io_memory: DEFAULT_IO_MEMORY,
497            ring_depth: DEFAULT_RING_SUBMISSION_DEPTH,
498            preempt_timer_duration: DEFAULT_PREEMPT_TIMER,
499            record_io_latencies: false,
500            blocking_thread_pool_placement: PoolPlacement::from(placement),
501            detect_stalls: None,
502        }
503    }
504
505    /// Spin for duration before parking a reactor
506    #[must_use = "The builder must be built to be useful"]
507    pub fn spin_before_park(mut self, spin: Duration) -> LocalExecutorBuilder {
508        self.spin_before_park = Some(spin);
509        self
510    }
511
512    /// Names the thread-to-be. Currently, the name is used for identification
513    /// only in panic messages.
514    #[must_use = "The builder must be built to be useful"]
515    pub fn name(mut self, name: &str) -> LocalExecutorBuilder {
516        self.name = String::from(name);
517        self
518    }
519
520    /// Amount of memory to reserve for storage I/O. This will be preallocated
521    /// and registered with io_uring. It is still possible to use more than
522    /// that, but it will come from the standard allocator and performance
523    /// will suffer.
524    ///
525    /// The system will always try to allocate at least 64 kiB for I/O memory,
526    /// and the default is 10 MiB.
527    #[must_use = "The builder must be built to be useful"]
528    pub fn io_memory(mut self, io_memory: usize) -> LocalExecutorBuilder {
529        self.io_memory = io_memory;
530        self
531    }
532
533    /// The depth of the IO rings to create. This influences the level of IO
534    /// concurrency. A higher ring depth allows a shard to submit a
535    /// greater number of IO requests to the kernel at once.
536    ///
537    /// Values above zero are valid and the default is 128.
538    #[must_use = "The builder must be built to be useful"]
539    pub fn ring_depth(mut self, ring_depth: usize) -> LocalExecutorBuilder {
540        assert!(ring_depth > 0, "ring depth should be strictly positive");
541        self.ring_depth = ring_depth;
542        self
543    }
544
545    /// How often [`need_preempt`] will return true by default.
546    ///
547    /// Lower values mean task queues will switch execution more often, which
548    /// can help latency but harm throughput. When individual task queues
549    /// are present, this value can still be dynamically lowered through the
550    /// [`Latency`] setting.
551    ///
552    /// Default is 100ms.
553    ///
554    /// [`need_preempt`]: ExecutorProxy::need_preempt
555    /// [`Latency`]: crate::Latency
556    #[must_use = "The builder must be built to be useful"]
557    pub fn preempt_timer(mut self, dur: Duration) -> LocalExecutorBuilder {
558        self.preempt_timer_duration = dur;
559        self
560    }
561
562    /// Whether to record the latencies of individual IO requests as part of the
563    /// IO stats. Recording latency can be expensive. Disabled by default.
564    #[must_use = "The builder must be built to be useful"]
565    pub fn record_io_latencies(mut self, enabled: bool) -> LocalExecutorBuilder {
566        self.record_io_latencies = enabled;
567        self
568    }
569
570    /// The placement policy of the blocking thread pool.
571    /// Defaults to one thread using the same placement strategy as the host
572    /// executor.
573    #[must_use = "The builder must be built to be useful"]
574    pub fn blocking_thread_pool_placement(
575        mut self,
576        placement: PoolPlacement,
577    ) -> LocalExecutorBuilder {
578        self.blocking_thread_pool_placement = placement;
579        self
580    }
581
582    /// Whether to detect stalls in unyielding tasks.
583    /// [`stall::DefaultStallDetectionHandler`] installs a signal handler for
584    /// [`nix::libc::SIGUSR1`], so is disabled by default.
585    /// # Examples
586    ///
587    /// ```
588    /// use glommio::{DefaultStallDetectionHandler, LocalExecutorBuilder};
589    ///
590    /// let local_ex = LocalExecutorBuilder::default()
591    ///     .detect_stalls(Some(Box::new(DefaultStallDetectionHandler {})))
592    ///     .make()
593    ///     .unwrap();
594    /// ```
595    #[must_use = "The builder must be built to be useful"]
596    pub fn detect_stalls(
597        mut self,
598        handler: Option<Box<dyn stall::StallDetectionHandler + 'static>>,
599    ) -> Self {
600        self.detect_stalls = handler;
601        self
602    }
603
604    /// Make a new [`LocalExecutor`] by taking ownership of the Builder, and
605    /// returns a [`Result`](crate::Result) to the executor.
606    /// # Examples
607    ///
608    /// ```
609    /// use glommio::LocalExecutorBuilder;
610    ///
611    /// let local_ex = LocalExecutorBuilder::default().make().unwrap();
612    /// ```
613    pub fn make(self) -> Result<LocalExecutor> {
614        let notifier = sys::new_sleep_notifier()?;
615        let mut cpu_set_gen = placement::CpuSetGenerator::one(self.placement)?;
616        let mut le = LocalExecutor::new(
617            notifier,
618            cpu_set_gen.next().cpu_binding(),
619            LocalExecutorConfig {
620                io_memory: self.io_memory,
621                ring_depth: self.ring_depth,
622                preempt_timer: self.preempt_timer_duration,
623                record_io_latencies: self.record_io_latencies,
624                spin_before_park: self.spin_before_park,
625                thread_pool_placement: self.blocking_thread_pool_placement,
626                detect_stalls: self.detect_stalls,
627            },
628        )?;
629        le.init();
630        Ok(le)
631    }
632
633    /// Spawn a new [`LocalExecutor`] in a new thread with a given task.
634    ///
635    /// This `spawn` function is an ergonomic shortcut for calling
636    /// `std::thread::spawn`, [`LocalExecutorBuilder::make`] in the spawned
637    /// thread, and then [`LocalExecutor::run`]. This `spawn` function takes
638    /// ownership of a [`LocalExecutorBuilder`] with the configuration for
639    /// the [`LocalExecutor`], spawns that executor in a new thread, and starts
640    /// the task given by `fut_gen()` in that thread.
641    ///
642    /// The indirection of `fut_gen()` here (instead of taking a `Future`)
643    /// allows for futures that may not be `Send`-able once started. As this
644    /// executor is thread-local it can guarantee that the futures will not
645    /// be Sent once started.
646    ///
647    /// # Panics
648    ///
649    /// The newly spawned thread panics if creating the executor fails. If you
650    /// need more fine-grained error handling consider initializing those
651    /// entities manually.
652    ///
653    /// # Example
654    ///
655    /// ```
656    /// use glommio::LocalExecutorBuilder;
657    ///
658    /// let handle = LocalExecutorBuilder::default()
659    ///     .spawn(|| async move {
660    ///         println!("hello");
661    ///     })
662    ///     .unwrap();
663    ///
664    /// handle.join().unwrap();
665    /// ```
666    ///
667    /// [`LocalExecutor`]: struct.LocalExecutor.html
668    ///
669    /// [`LocalExecutorBuilder`]: struct.LocalExecutorBuilder.html
670    ///
671    /// [`LocalExecutorBuilder::make`]:
672    /// struct.LocalExecutorBuilder.html#method.make
673    ///
674    /// [`LocalExecutor::run`]:struct.LocalExecutor.html#method.run
675    #[must_use = "This spawns an executor on a thread, so you may need to call \
676                  `JoinHandle::join()` to keep the main thread alive"]
677    pub fn spawn<G, F, T>(self, fut_gen: G) -> Result<ExecutorJoinHandle<T>>
678    where
679        G: FnOnce() -> F + Send + 'static,
680        F: Future<Output = T> + 'static,
681        T: Send + 'static,
682    {
683        let notifier = sys::new_sleep_notifier()?;
684        let name = format!("{}-{}", self.name, notifier.id());
685        let mut cpu_set_gen = placement::CpuSetGenerator::one(self.placement)?;
686        let io_memory = self.io_memory;
687        let ring_depth = self.ring_depth;
688        let preempt_timer_duration = self.preempt_timer_duration;
689        let spin_before_park = self.spin_before_park;
690        let detect_stalls = self.detect_stalls;
691        let record_io_latencies = self.record_io_latencies;
692        let blocking_thread_pool_placement = self.blocking_thread_pool_placement;
693
694        Builder::new()
695            .name(name)
696            .spawn(move || {
697                let mut le = LocalExecutor::new(
698                    notifier,
699                    cpu_set_gen.next().cpu_binding(),
700                    LocalExecutorConfig {
701                        io_memory,
702                        ring_depth,
703                        preempt_timer: preempt_timer_duration,
704                        record_io_latencies,
705                        spin_before_park,
706                        thread_pool_placement: blocking_thread_pool_placement,
707                        detect_stalls,
708                    },
709                )?;
710                le.init();
711                le.run(async move { Ok(fut_gen().await) })
712            })
713            .map_err(Into::into)
714            .map(ExecutorJoinHandle)
715    }
716}
717
718impl Default for LocalExecutorBuilder {
719    fn default() -> Self {
720        Self::new(Placement::Unbound)
721    }
722}
723
724/// A factory to configure and create a pool of [`LocalExecutor`]s.
725///
726/// Configuration methods apply their settings to all [`LocalExecutor`]s in the
727/// pool unless otherwise specified.  Methods can be chained on the builder in
728/// order to configure it.  The [`Self::on_all_shards`] method will take
729/// ownership of the builder and create a [`PoolThreadHandles`] struct which can
730/// be used to join the executor threads.
731///
732/// # Example
733///
734/// ```
735/// use glommio::{LocalExecutorPoolBuilder, PoolPlacement};
736///
737/// let handles = LocalExecutorPoolBuilder::new(PoolPlacement::Unbound(4))
738///     .on_all_shards(|| async move {
739///         let id = glommio::executor().id();
740///         println!("hello from executor {id}");
741///     })
742///     .unwrap();
743///
744/// handles.join_all();
745/// ```
746pub struct LocalExecutorPoolBuilder {
747    /// Spin for duration before parking a reactor
748    spin_before_park: Option<Duration>,
749    /// A name for the thread-to-be (if any), for identification in panic
750    /// messages. Each executor in the pool will use this name followed by
751    /// a hyphen and numeric id (e.g. `myname-1`).
752    name: String,
753    /// Amount of memory to reserve for storage I/O. This will be preallocated
754    /// and registered with io_uring. It is still possible to use more than
755    /// that, but it will come from the standard allocator and performance
756    /// will suffer. Defaults to 10 MiB.
757    io_memory: usize,
758    /// The depth of the IO rings to create. This influences the level of IO
759    /// concurrency. A higher ring depth allows a shard to submit a
760    /// greater number of IO requests to the kernel at once.
761    ring_depth: usize,
762    /// How often to yield to other task queues
763    preempt_timer_duration: Duration,
764    /// Indicates a policy by which [`LocalExecutor`]s are bound to CPUs.
765    placement: PoolPlacement,
766    /// Whether to record the latencies of individual IO requests
767    record_io_latencies: bool,
768    /// The placement policy of the blocking thread pools. Each executor has
769    /// its own pool. Defaults to 1 thread per pool, bound using the same
770    /// placement strategy as its host executor
771    blocking_thread_pool_placement: PoolPlacement,
772    /// Factory function to generate the stall detection handler.
773    /// [`DefaultStallDetectionHandler installs`] a signal handler for
774    /// [`nix::libc::SIGUSR1`], so is disabled by default.
775    handler_gen: Option<Box<dyn Fn() -> Box<dyn stall::StallDetectionHandler + 'static>>>,
776}
777
778impl fmt::Debug for LocalExecutorPoolBuilder {
779    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
780        f.debug_struct("LocalExecutorPoolBuilder")
781            .field("spin_before_park", &self.spin_before_park)
782            .field("name", &self.name)
783            .field("io_memory", &self.io_memory)
784            .field("ring_depth", &self.ring_depth)
785            .field("preempt_timer_duration", &self.preempt_timer_duration)
786            .field("record_io_latencies", &self.record_io_latencies)
787            .field(
788                "blocking_thread_pool_placement",
789                &self.blocking_thread_pool_placement,
790            )
791            .finish_non_exhaustive()
792    }
793}
794
795impl LocalExecutorPoolBuilder {
796    /// Generates the base configuration for spawning a pool of
797    /// [`LocalExecutor`]s, from which configuration methods can be chained.
798    /// The method's only argument is the [`PoolPlacement`] policy by which
799    /// [`LocalExecutor`]s are bound to the machine's hardware topology. i.e.
800    /// how many and which CPUs to use.
801    pub fn new(placement: PoolPlacement) -> Self {
802        Self {
803            spin_before_park: None,
804            name: String::from(DEFAULT_EXECUTOR_NAME),
805            io_memory: DEFAULT_IO_MEMORY,
806            ring_depth: DEFAULT_RING_SUBMISSION_DEPTH,
807            preempt_timer_duration: DEFAULT_PREEMPT_TIMER,
808            placement: placement.clone(),
809            record_io_latencies: false,
810            blocking_thread_pool_placement: placement.shrink_to(1),
811            handler_gen: None,
812        }
813    }
814
815    /// Please see documentation under
816    /// [`LocalExecutorBuilder::spin_before_park`] for details.  The setting
817    /// is applied to all executors in the pool.
818    #[must_use = "The builder must be built to be useful"]
819    pub fn spin_before_park(mut self, spin: Duration) -> Self {
820        self.spin_before_park = Some(spin);
821        self
822    }
823
824    /// Please see documentation under [`LocalExecutorBuilder::name`] for
825    /// details. The setting is applied to all executors in the pool. Note
826    /// that when a thread is spawned, the `name` is combined with a hyphen
827    /// and numeric id (e.g. `myname-1`) such that each thread has a unique
828    /// name.
829    #[must_use = "The builder must be built to be useful"]
830    pub fn name(mut self, name: &str) -> Self {
831        self.name = String::from(name);
832        self
833    }
834
835    /// Please see documentation under [`LocalExecutorBuilder::io_memory`] for
836    /// details.  The setting is applied to all executors in the pool.
837    #[must_use = "The builder must be built to be useful"]
838    pub fn io_memory(mut self, io_memory: usize) -> Self {
839        self.io_memory = io_memory;
840        self
841    }
842
843    /// Please see documentation under [`LocalExecutorBuilder::ring_depth`] for
844    /// details.  The setting is applied to all executors in the pool.
845    #[must_use = "The builder must be built to be useful"]
846    pub fn ring_depth(mut self, ring_depth: usize) -> Self {
847        assert!(ring_depth > 0, "ring depth should be strictly positive");
848        self.ring_depth = ring_depth;
849        self
850    }
851
852    /// Please see documentation under [`LocalExecutorBuilder::preempt_timer`]
853    /// for details.  The setting is applied to all executors in the pool.
854    #[must_use = "The builder must be built to be useful"]
855    pub fn preempt_timer(mut self, dur: Duration) -> Self {
856        self.preempt_timer_duration = dur;
857        self
858    }
859
860    /// Whether to record the latencies of individual IO requests as part of the
861    /// IO stats. Recording latency can be expensive. Disabled by default.
862    #[must_use = "The builder must be built to be useful"]
863    pub fn record_io_latencies(mut self, enabled: bool) -> Self {
864        self.record_io_latencies = enabled;
865        self
866    }
867
868    /// The placement policy of the blocking thread pool.
869    /// Defaults to one thread using the same placement strategy as the host
870    /// executor.
871    #[must_use = "The builder must be built to be useful"]
872    pub fn blocking_thread_pool_placement(mut self, placement: PoolPlacement) -> Self {
873        self.blocking_thread_pool_placement = placement;
874        self
875    }
876
877    /// Whether to detect stalls in unyielding tasks.
878    /// This method takes a closure of `handler_gen`, which will be called on
879    /// each new thread to generate the stall detection handler to be used in
880    /// that executor. [`stall::DefaultStallDetectionHandler`] installs a signal
881    /// handler for [`nix::libc::SIGUSR1`], so is disabled by default.
882    /// # Examples
883    ///
884    /// ```
885    /// use glommio::{
886    ///     timer::Timer,
887    ///     DefaultStallDetectionHandler,
888    ///     LocalExecutorPoolBuilder,
889    ///     PoolPlacement,
890    /// };
891    ///
892    /// let local_ex = LocalExecutorPoolBuilder::new(PoolPlacement::Unbound(4))
893    ///     .detect_stalls(Some(Box::new(|| Box::new(DefaultStallDetectionHandler {}))))
894    ///     .on_all_shards(move || async {
895    ///         Timer::new(std::time::Duration::from_millis(100)).await;
896    ///         println!("Hello world!");
897    ///     })
898    ///     .expect("failed to spawn local executors")
899    ///     .join_all();
900    /// ```
901    #[must_use = "The builder must be built to be useful"]
902    pub fn detect_stalls(
903        mut self,
904        handler_gen: Option<Box<dyn Fn() -> Box<dyn stall::StallDetectionHandler + 'static>>>,
905    ) -> Self {
906        self.handler_gen = handler_gen;
907        self
908    }
909
910    /// Spawn a pool of [`LocalExecutor`]s in a new thread according to the
911    /// [`PoolPlacement`] policy, which is `Unbound` by default.
912    ///
913    /// This method is the pool equivalent of [`LocalExecutorBuilder::spawn`].
914    ///
915    /// The method takes a closure `fut_gen` which will be called on each new
916    /// thread to obtain the [`Future`] to be executed there.
917    ///
918    /// # Panics
919    ///
920    /// The newly spawned thread panics if creating the executor fails. If you
921    /// need more fine-grained error handling consider initializing those
922    /// entities manually.
923    #[must_use = "This spawns executors on multiple threads; threads may fail to spawn or you may \
924                  need to call `PoolThreadHandles::join_all()` to keep the main thread alive"]
925    pub fn on_all_shards<G, F, T>(self, fut_gen: G) -> Result<PoolThreadHandles<T>>
926    where
927        G: FnOnce() -> F + Clone + Send + 'static,
928        F: Future<Output = T> + 'static,
929        T: Send + 'static,
930    {
931        let mut handles = PoolThreadHandles::new();
932        let nr_shards = self.placement.executor_count();
933        let mut cpu_set_gen = placement::CpuSetGenerator::pool(self.placement.clone())?;
934        let latch = Latch::new(nr_shards);
935
936        for _ in 0..nr_shards {
937            match self.spawn_thread(&mut cpu_set_gen, &latch, fut_gen.clone()) {
938                Ok(handle) => handles.push(handle),
939                Err(err) => {
940                    handles.join_all();
941                    return Err(err);
942                }
943            }
944        }
945
946        Ok(handles)
947    }
948
949    /// Spawns a thread
950    fn spawn_thread<G, F, T>(
951        &self,
952        cpu_set_gen: &mut placement::CpuSetGenerator,
953        latch: &Latch,
954        fut_gen: G,
955    ) -> Result<JoinHandle<Result<T>>>
956    where
957        G: FnOnce() -> F + Clone + Send + 'static,
958        F: Future<Output = T> + 'static,
959        T: Send + 'static,
960    {
961        // NOTE: `self.placement` was `std::mem::take`en in `Self::on_all_shards`; you
962        // should no longer rely on its value at this point
963        let cpu_binding = cpu_set_gen.next().cpu_binding();
964        let notifier = sys::new_sleep_notifier()?;
965        let name = format!("{}-{}", self.name, notifier.id());
966        let handle = Builder::new().name(name).spawn({
967            let io_memory = self.io_memory;
968            let ring_depth = self.ring_depth;
969            let preempt_timer_duration = self.preempt_timer_duration;
970            let spin_before_park = self.spin_before_park;
971            let record_io_latencies = self.record_io_latencies;
972            let blocking_thread_pool_placement = self.blocking_thread_pool_placement.clone();
973            let detect_stalls = self.handler_gen.as_ref().map(|x| (*x.deref())());
974            let latch = Latch::clone(latch);
975
976            move || {
977                // only allow the thread to create the `LocalExecutor` if all other threads that
978                // are supposed to be created by the pool builder were successfully spawned
979                if latch.arrive_and_wait() == LatchState::Ready {
980                    let mut le = LocalExecutor::new(
981                        notifier,
982                        cpu_binding,
983                        LocalExecutorConfig {
984                            io_memory,
985                            ring_depth,
986                            preempt_timer: preempt_timer_duration,
987                            record_io_latencies,
988                            spin_before_park,
989                            thread_pool_placement: blocking_thread_pool_placement,
990                            detect_stalls,
991                        },
992                    )?;
993                    le.init();
994                    le.run(async move { Ok(fut_gen().await) })
995                } else {
996                    // this `Err` isn't visible to the user; the pool builder directly returns an
997                    // `Err` from the `std::thread::Builder`
998                    Err(io::Error::new(io::ErrorKind::Other, "spawn failed").into())
999                }
1000            }
1001        });
1002
1003        match handle {
1004            Ok(h) => Ok(h),
1005            Err(e) => {
1006                // The `std::thread::Builder` was unable to spawn the thread and retuned an
1007                // `Err`, so we notify other threads to let them know they
1008                // should not proceed with constructing their `LocalExecutor`s
1009                latch.cancel().expect("unreachable: latch was ready");
1010
1011                Err(e.into())
1012            }
1013        }
1014    }
1015}
1016
1017/// Holds a collection of [`JoinHandle`]s.
1018///
1019/// This struct is returned by [`LocalExecutorPoolBuilder::on_all_shards`].
1020#[derive(Debug)]
1021pub struct PoolThreadHandles<T> {
1022    handles: Vec<JoinHandle<Result<T>>>,
1023}
1024
1025impl<T> PoolThreadHandles<T> {
1026    fn new() -> Self {
1027        Self {
1028            handles: Vec::new(),
1029        }
1030    }
1031
1032    fn push(&mut self, handle: JoinHandle<Result<T>>) {
1033        self.handles.push(handle)
1034    }
1035
1036    /// Obtain a reference to the `JoinHandle`s.
1037    pub fn handles(&self) -> &Vec<JoinHandle<Result<T>>> {
1038        &self.handles
1039    }
1040
1041    /// Calls [`JoinHandle::join`] on all handles.
1042    pub fn join_all(self) -> Vec<Result<T>> {
1043        self.handles
1044            .into_iter()
1045            .map(|h| {
1046                match h.join() {
1047                    Ok(ok @ Ok(_)) => ok,
1048                    // this variant is unreachable since `Err` is only returned from a thread if
1049                    // another thread failed to spawn; `LocalExecutorPoolBuilder::on_all_shards`
1050                    // returns an immediate `Err` if any thread fails to spawn, so
1051                    // `PoolThreadHandles` would never be created
1052                    Ok(err @ Err(_)) => err,
1053                    Err(e) => Err(GlommioError::BuilderError(BuilderErrorKind::ThreadPanic(e))),
1054                }
1055            })
1056            .collect::<Vec<_>>()
1057    }
1058}
1059
1060pub(crate) fn maybe_activate(tq: Rc<RefCell<TaskQueue>>) {
1061    #[cfg(not(feature = "native-tls"))]
1062    LOCAL_EX.with(|local_ex| {
1063        let mut queues = local_ex.queues.borrow_mut();
1064        queues.maybe_activate(tq);
1065    });
1066
1067    #[cfg(feature = "native-tls")]
1068    unsafe {
1069        let mut queues = LOCAL_EX
1070            .as_ref()
1071            .expect("this thread doesn't have a LocalExecutor running")
1072            .queues
1073            .borrow_mut();
1074        queues.maybe_activate(tq);
1075    };
1076}
1077
1078pub struct LocalExecutorConfig {
1079    pub io_memory: usize,
1080    pub ring_depth: usize,
1081    pub preempt_timer: Duration,
1082    pub record_io_latencies: bool,
1083    pub spin_before_park: Option<Duration>,
1084    pub thread_pool_placement: PoolPlacement,
1085    pub detect_stalls: Option<Box<dyn stall::StallDetectionHandler + 'static>>,
1086}
1087
1088/// Single-threaded executor.
1089///
1090/// The executor can only be run on the thread that created it.
1091///
1092/// # Examples
1093///
1094/// ```
1095/// use glommio::LocalExecutor;
1096///
1097/// let local_ex = LocalExecutor::default();
1098///
1099/// local_ex.run(async {
1100///     println!("Hello world!");
1101/// });
1102/// ```
1103///
1104/// In many cases, use of [`LocalExecutorBuilder`] will provide more
1105/// configuration options and more ergonomic methods. See
1106/// [`LocalExecutorBuilder::spawn`] for examples.
1107///
1108/// [`LocalExecutorBuilder`]: struct.LocalExecutorBuilder.html
1109///
1110/// [`LocalExecutorBuilder::spawn`]:
1111/// struct.LocalExecutorBuilder.html#method.spawn
1112#[derive(Debug)]
1113pub struct LocalExecutor {
1114    queues: Rc<RefCell<ExecutorQueues>>,
1115    parker: parking::Parker,
1116    id: usize,
1117    reactor: Rc<reactor::Reactor>,
1118    stall_detector: RefCell<Option<StallDetector>>,
1119}
1120
1121impl LocalExecutor {
1122    fn get_reactor(&self) -> Rc<Reactor> {
1123        self.reactor.clone()
1124    }
1125
1126    fn init(&mut self) {
1127        let io_requirements = IoRequirements::new(Latency::NotImportant, 0);
1128        self.queues.borrow_mut().available_executors.insert(
1129            0,
1130            TaskQueue::new(
1131                Default::default(),
1132                "default",
1133                Shares::Static(1000),
1134                io_requirements,
1135            ),
1136        );
1137    }
1138
1139    fn new(
1140        notifier: Arc<sys::SleepNotifier>,
1141        cpu_binding: Option<impl IntoIterator<Item = usize>>,
1142        mut config: LocalExecutorConfig,
1143    ) -> Result<LocalExecutor> {
1144        let blocking_thread =
1145            BlockingThreadPool::new(config.thread_pool_placement, notifier.clone())?;
1146
1147        // Linux's default memory policy is "local allocation" which allocates memory
1148        // on the NUMA node containing the CPU where the allocation takes place.
1149        // Hence, we bind to a CPU in the provided CPU set before allocating any
1150        // memory for the `LocalExecutor`, thereby allowing any access to these
1151        // data structures to occur on a local NUMA node (nevertheless, for some
1152        // `Placement` variants a CPU set could span multiple NUMA nodes).
1153        // For additional information see:
1154        // https://www.kernel.org/doc/html/latest/admin-guide/mm/numa_memory_policy.html
1155        match cpu_binding {
1156            Some(cpu_set) => bind_to_cpu_set(cpu_set)?,
1157            None => config.spin_before_park = None,
1158        }
1159        let p = parking::Parker::new();
1160        let queues = ExecutorQueues::new(config.preempt_timer, config.spin_before_park);
1161        let id = notifier.id();
1162        trace!(id = id, "Creating executor");
1163        Ok(LocalExecutor {
1164            queues: Rc::new(RefCell::new(queues)),
1165            parker: p,
1166            id,
1167            reactor: Rc::new(reactor::Reactor::new(
1168                notifier,
1169                config.io_memory,
1170                config.ring_depth,
1171                config.record_io_latencies,
1172                blocking_thread,
1173            )?),
1174            stall_detector: RefCell::new(
1175                config
1176                    .detect_stalls
1177                    .map(|x| StallDetector::new(id, x))
1178                    .transpose()?,
1179            ),
1180        })
1181    }
1182
1183    /// Enable or disable task stall detection at runtime
1184    ///
1185    /// # Examples
1186    /// ```
1187    /// use glommio::{DefaultStallDetectionHandler, LocalExecutor};
1188    ///
1189    /// let local_ex =
1190    ///     LocalExecutor::default().detect_stalls(Some(Box::new(DefaultStallDetectionHandler {})));
1191    /// ```
1192    pub fn detect_stalls(
1193        &self,
1194        handler: Option<Box<dyn stall::StallDetectionHandler + 'static>>,
1195    ) -> Result<()> {
1196        self.stall_detector.replace(
1197            handler
1198                .map(|x| StallDetector::new(self.id, x))
1199                .transpose()?,
1200        );
1201        Ok(())
1202    }
1203
1204    /// Returns a unique identifier for this Executor.
1205    ///
1206    /// # Examples
1207    /// ```
1208    /// use glommio::LocalExecutor;
1209    ///
1210    /// let local_ex = LocalExecutor::default();
1211    /// println!("My ID: {}", local_ex.id());
1212    /// ```
1213    pub fn id(&self) -> usize {
1214        self.id
1215    }
1216
1217    fn create_task_queue<S>(&self, shares: Shares, latency: Latency, name: S) -> TaskQueueHandle
1218    where
1219        S: Into<String>,
1220    {
1221        let index = {
1222            let mut ex = self.queues.borrow_mut();
1223            let index = ex.executor_index;
1224            ex.executor_index += 1;
1225            index
1226        };
1227
1228        let io_requirements = IoRequirements::new(latency, index);
1229        let tq = TaskQueue::new(TaskQueueHandle { index }, name, shares, io_requirements);
1230
1231        self.queues
1232            .borrow_mut()
1233            .available_executors
1234            .insert(index, tq);
1235        TaskQueueHandle { index }
1236    }
1237
1238    /// Removes a task queue.
1239    ///
1240    /// The task queue cannot be removed if there are still pending tasks.
1241    pub fn remove_task_queue(&self, handle: TaskQueueHandle) -> Result<()> {
1242        let mut queues = self.queues.borrow_mut();
1243
1244        let queue_entry = queues.available_executors.entry(handle.index);
1245        if let Entry::Occupied(entry) = queue_entry {
1246            let tq = entry.get();
1247            if tq.borrow().is_active() {
1248                return Err(GlommioError::queue_still_active(handle.index));
1249            }
1250
1251            entry.remove();
1252            return Ok(());
1253        }
1254        Err(GlommioError::queue_not_found(handle.index))
1255    }
1256
1257    fn get_queue(&self, handle: &TaskQueueHandle) -> Option<Rc<RefCell<TaskQueue>>> {
1258        self.queues
1259            .borrow()
1260            .available_executors
1261            .get(&handle.index)
1262            .cloned()
1263    }
1264
1265    fn current_task_queue(&self) -> TaskQueueHandle {
1266        self.queues
1267            .borrow()
1268            .active_executing
1269            .as_ref()
1270            .unwrap()
1271            .borrow()
1272            .stats
1273            .index
1274    }
1275
1276    fn mark_me_for_yield(&self) {
1277        let queues = self.queues.borrow();
1278        let mut me = queues.active_executing.as_ref().unwrap().borrow_mut();
1279        me.yielded = true;
1280    }
1281
1282    fn spawn<T>(&self, future: impl Future<Output = T>) -> multitask::Task<T> {
1283        let tq = self
1284            .queues
1285            .borrow()
1286            .active_executing
1287            .clone() // this clone is cheap because we clone an `Option<Rc<_>>`
1288            .or_else(|| self.get_queue(&TaskQueueHandle { index: 0 }))
1289            .unwrap();
1290
1291        let id = self.id;
1292        let ex = tq.borrow().ex.clone();
1293        ex.spawn_and_run(id, tq, future)
1294    }
1295
1296    fn spawn_into<T, F>(&self, future: F, handle: TaskQueueHandle) -> Result<multitask::Task<T>>
1297    where
1298        F: Future<Output = T>,
1299    {
1300        let tq = self
1301            .get_queue(&handle)
1302            .ok_or_else(|| GlommioError::queue_not_found(handle.index))?;
1303        let ex = tq.borrow().ex.clone();
1304        let id = self.id;
1305
1306        // can't run right away, because we need to cross into a different task queue
1307        Ok(ex.spawn_and_schedule(id, tq, future))
1308    }
1309
1310    fn preempt_timer_duration(&self) -> Duration {
1311        self.queues.borrow().preempt_timer_duration
1312    }
1313
1314    fn spin_before_park(&self) -> Option<Duration> {
1315        self.queues.borrow().spin_before_park
1316    }
1317
1318    #[inline(always)]
1319    pub(crate) fn need_preempt(&self) -> bool {
1320        self.reactor.need_preempt()
1321    }
1322
1323    fn run_task_queues(&self) -> bool {
1324        let mut ran = false;
1325        loop {
1326            self.reactor.sys.install_eventfd();
1327            if self.need_preempt() {
1328                break;
1329            }
1330            if !self.run_one_task_queue() {
1331                return false;
1332            } else {
1333                ran = true;
1334            }
1335        }
1336        ran
1337    }
1338
1339    fn run_one_task_queue(&self) -> bool {
1340        let mut tq = self.queues.borrow_mut();
1341        let candidate = tq.active_executors.pop();
1342        tq.stats.scheduler_runs += 1;
1343
1344        if candidate.is_none() {
1345            return false;
1346        }
1347
1348        let queue = candidate.unwrap();
1349        tq.active_executing = Some(queue.clone());
1350        drop(tq);
1351
1352        let time = {
1353            let now = Instant::now();
1354            let mut queue_ref = queue.borrow_mut();
1355            queue_ref.prepare_to_run(now);
1356            self.reactor
1357                .inform_io_requirements(queue_ref.io_requirements);
1358            now
1359        };
1360
1361        let (runtime, tasks_executed_this_loop) = {
1362            let detector = self.stall_detector.borrow();
1363            let guard = detector.as_ref().map(|x| {
1364                let queue = queue.borrow_mut();
1365                x.enter_task_queue(
1366                    queue.stats.index,
1367                    queue.name.clone(),
1368                    time,
1369                    self.preempt_timer_duration(),
1370                )
1371            });
1372
1373            let mut tasks_executed_this_loop = 0;
1374            loop {
1375                let mut queue_ref = queue.borrow_mut();
1376                if self.need_preempt() || queue_ref.yielded() {
1377                    break;
1378                }
1379
1380                if let Some(r) = queue_ref.get_task() {
1381                    drop(queue_ref);
1382                    r.run();
1383                    tasks_executed_this_loop += 1;
1384                } else {
1385                    break;
1386                }
1387            }
1388            let elapsed = time.elapsed();
1389            drop(guard);
1390            (elapsed, tasks_executed_this_loop)
1391        };
1392
1393        let (need_repush, vruntime) = {
1394            let mut state = queue.borrow_mut();
1395            let last_vruntime = state.account_vruntime(runtime);
1396            (state.is_active(), last_vruntime)
1397        };
1398
1399        let mut tq = self.queues.borrow_mut();
1400        tq.active_executing = None;
1401        tq.stats.executor_runtime += runtime;
1402        tq.stats.tasks_executed += tasks_executed_this_loop;
1403        let vruntime = match vruntime {
1404            Some(x) => x,
1405            None => {
1406                for queue in tq.available_executors.values() {
1407                    let mut q = queue.borrow_mut();
1408                    q.vruntime = 0;
1409                }
1410                0
1411            }
1412        };
1413
1414        if need_repush {
1415            tq.active_executors.push(queue);
1416        } else {
1417            tq.reevaluate_preempt_timer();
1418        }
1419
1420        // Compute the smallest vruntime out of all the active task queues
1421        // This value is used to set the vruntime of deactivated task queues when they
1422        // are woken up.
1423        tq.default_vruntime = tq
1424            .active_executors
1425            .peek()
1426            .map(|x| x.borrow().vruntime)
1427            .unwrap_or(vruntime);
1428
1429        true
1430    }
1431
1432    /// Runs the executor until the given future completes.
1433    ///
1434    /// # Examples
1435    ///
1436    /// ```
1437    /// use glommio::{LocalExecutor, Task};
1438    ///
1439    /// let local_ex = LocalExecutor::default();
1440    ///
1441    /// let res = local_ex.run(async {
1442    ///     let task = glommio::spawn_local(async { 1 + 2 });
1443    ///     task.await * 2
1444    /// });
1445    ///
1446    /// assert_eq!(res, 6);
1447    /// ```
1448    pub fn run<T>(&self, future: impl Future<Output = T>) -> T {
1449        let run = |this: &Self| {
1450            // this waker is never exposed in the public interface and is only used to check
1451            // whether the task's `JoinHandle` is `Ready`
1452            let waker = dummy_waker();
1453            let cx = &mut Context::from_waker(&waker);
1454
1455            let spin_before_park = self.spin_before_park().unwrap_or_default();
1456
1457            let future = this
1458                .spawn_into(future, TaskQueueHandle::default())
1459                .unwrap()
1460                .detach();
1461            pin!(future);
1462
1463            let mut pre_time = Instant::now();
1464            loop {
1465                if let Poll::Ready(t) = future.as_mut().poll(cx) {
1466                    // can't be canceled, and join handle is None only upon
1467                    // cancellation or panic. So in case of panic this just propagates
1468                    let cur_time = Instant::now();
1469                    this.queues.borrow_mut().stats.total_runtime += cur_time - pre_time;
1470                    break t.unwrap();
1471                }
1472
1473                // We want to do I/O before we call run_task_queues,
1474                // for the benefit of the latency ring. If there are pending
1475                // requests that are latency sensitive we want them out of the
1476                // ring ASAP (before we run the task queues). We will also use
1477                // the opportunity to install the timer.
1478                this.parker
1479                    .poll_io(|| Some(this.preempt_timer_duration()))
1480                    .expect("Failed to poll io! This is actually pretty bad!");
1481
1482                // run user code
1483                let run = this.run_task_queues();
1484
1485                // account for runtime and poll/sleep if possible
1486                let cur_time = Instant::now();
1487                this.queues.borrow_mut().stats.total_runtime += cur_time - pre_time;
1488                pre_time = cur_time;
1489                if !run {
1490                    if let Poll::Ready(t) = future.as_mut().poll(cx) {
1491                        // It may be that we just became ready now that the task queue
1492                        // is exhausted. But if we sleep (park) we'll never know so we
1493                        // test again here. We can't test *just* here because the main
1494                        // future is probably the one setting up the task queues and etc.
1495                        break t.unwrap();
1496                    } else {
1497                        while !this.reactor.spin_poll_io().unwrap() {
1498                            if pre_time.elapsed() > spin_before_park {
1499                                this.parker
1500                                    .park()
1501                                    .expect("Failed to park! This is actually pretty bad!");
1502                                break;
1503                            }
1504                        }
1505                        // reset the timer for deduct spin loop time
1506                        pre_time = Instant::now();
1507                    }
1508                }
1509            }
1510        };
1511
1512        #[cfg(not(feature = "native-tls"))]
1513        {
1514            assert!(
1515                !LOCAL_EX.is_set(),
1516                "There is already an LocalExecutor running on this thread"
1517            );
1518            LOCAL_EX.set(self, || run(self))
1519        }
1520
1521        #[cfg(feature = "native-tls")]
1522        unsafe {
1523            assert!(
1524                LOCAL_EX.is_null(),
1525                "There is already an LocalExecutor running on this thread"
1526            );
1527
1528            defer!(LOCAL_EX = std::ptr::null());
1529            LOCAL_EX = self as *const Self;
1530            run(self)
1531        }
1532    }
1533}
1534
1535/// Spawns a single-threaded executor with default settings on the current
1536/// thread.
1537///
1538/// This will create a executor using default parameters of
1539/// `LocalExecutorBuilder`, if you want to further customize it, use this API
1540/// instead.
1541///
1542/// # Panics
1543///
1544/// Panics if creating the executor fails; use `LocalExecutorBuilder::make` to
1545/// recover from such errors.
1546///
1547/// # Examples
1548///
1549/// ```
1550/// use glommio::LocalExecutor;
1551///
1552/// let local_ex = LocalExecutor::default();
1553/// ```
1554impl Default for LocalExecutor {
1555    fn default() -> Self {
1556        LocalExecutorBuilder::new(Placement::Unbound)
1557            .make()
1558            .unwrap()
1559    }
1560}
1561
1562/// A spawned future that can be detached
1563///
1564/// Tasks are also futures themselves and yield the output of the spawned
1565/// future.
1566///
1567/// When a task is dropped, its gets canceled and won't be polled again. To
1568/// cancel a task a bit more gracefully and wait until it stops running, use the
1569/// [`cancel()`][`Task::cancel()`] method.
1570///
1571/// Tasks that panic get immediately canceled. Awaiting a canceled task also
1572/// causes a panic.
1573///
1574/// # Examples
1575///
1576/// ```
1577/// # use glommio::{LocalExecutor, Task};
1578/// #
1579/// # let ex = LocalExecutor::default();
1580/// #
1581/// # ex.run(async {
1582/// let task = glommio::spawn_local(async {
1583///     println!("Hello from a task!");
1584///     1 + 2
1585/// });
1586///
1587/// assert_eq!(task.await, 3);
1588/// # });
1589/// ```
1590/// Note that there is no guarantee of ordering when reasoning about when a
1591/// task runs, as that is an implementation detail.
1592///
1593/// In particular, acquiring a borrow and holding across a task spawning may
1594/// sometimes work but panic depending on scheduling decisions, so it is still
1595/// illegal.
1596///
1597///
1598/// ```no_run
1599/// # use glommio::{LocalExecutor, Task};
1600/// # use std::rc::Rc;
1601/// # use std::cell::RefCell;
1602/// #
1603/// # let ex = LocalExecutor::default();
1604/// #
1605/// # ex.run(async {
1606/// let example = Rc::new(RefCell::new(0));
1607/// let exclone = example.clone();
1608///
1609/// let mut ex_mut = example.borrow_mut();
1610/// *ex_mut = 1;
1611///
1612/// let task = glommio::spawn_local(async move {
1613///     let ex = exclone.borrow();
1614///     println!("Current value: {ex}");
1615/// });
1616///
1617/// // This is fine if `task` executes after the current task, but will panic if
1618/// // preempts the current task and executes first. This is therefore invalid.
1619/// *ex_mut = 2;
1620/// drop(ex_mut);
1621///
1622/// task.await;
1623/// # });
1624/// ```
1625#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
1626#[derive(Debug)]
1627pub struct Task<T>(multitask::Task<T>);
1628
1629impl<T> Task<T> {
1630    /// Detaches the task to let it keep running in the background.
1631    ///
1632    /// # Examples
1633    ///
1634    /// ```
1635    /// use futures_lite::future;
1636    /// use glommio::{timer::Timer, LocalExecutor};
1637    ///
1638    /// let ex = LocalExecutor::default();
1639    /// ex.run(async {
1640    ///     glommio::spawn_local(async {
1641    ///         loop {
1642    ///             println!("I'm a background task looping forever.");
1643    ///             glommio::executor().yield_task_queue_now().await;
1644    ///         }
1645    ///     })
1646    ///     .detach();
1647    ///     Timer::new(std::time::Duration::from_micros(100)).await;
1648    /// })
1649    /// ```
1650    pub fn detach(self) -> task::JoinHandle<T> {
1651        self.0.detach()
1652    }
1653
1654    /// Cancels the task and waits for it to stop running.
1655    ///
1656    /// Returns the task's output if it was completed just before it got
1657    /// canceled, or [`None`] if it didn't complete.
1658    ///
1659    /// While it's possible to simply drop the [`Task`] to cancel it, this is a
1660    /// cleaner way of canceling because it also waits for the task to stop
1661    /// running.
1662    ///
1663    /// # Examples
1664    ///
1665    /// ```
1666    /// use futures_lite::future;
1667    /// use glommio::LocalExecutor;
1668    ///
1669    /// let ex = LocalExecutor::default();
1670    ///
1671    /// ex.run(async {
1672    ///     let task = glommio::spawn_local(async {
1673    ///         loop {
1674    ///             println!("Even though I'm in an infinite loop, you can still cancel me!");
1675    ///             future::yield_now().await;
1676    ///         }
1677    ///     });
1678    ///
1679    ///     task.cancel().await;
1680    /// });
1681    /// ```
1682    pub async fn cancel(self) -> Option<T> {
1683        self.0.cancel().await
1684    }
1685}
1686
1687impl<T> Future for Task<T> {
1688    type Output = T;
1689
1690    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1691        Pin::new(&mut self.0).poll(cx)
1692    }
1693}
1694
1695/// A spawned future that cannot be detached, and has a predictable lifetime.
1696///
1697/// Because their lifetimes are bounded, you don't need to make sure that data
1698/// you pass to the `ScopedTask` is `'static`, which can be cheaper (no need to
1699/// reference count). If you, however, would like to `.detach` this task and
1700/// have it run in the background, consider using [`Task`] instead.
1701///
1702/// Tasks are also futures themselves and yield the output of the spawned
1703/// future.
1704///
1705/// When a task is dropped, its gets canceled and won't be polled again. To
1706/// cancel a task a bit more gracefully and wait until it stops running, use the
1707/// [`cancel()`][`ScopedTask::cancel()`] method.
1708///
1709/// Tasks that panic get immediately canceled. Awaiting a canceled task also
1710/// causes a panic.
1711///
1712/// # Safety
1713///
1714/// `ScopedTask` is safe to use so long as it is guaranteed to be either awaited
1715/// or dropped. Rust does not guarantee that destructors will be called, and if
1716/// they are not, `ScopedTask`s can be kept alive after the scope is terminated.
1717///
1718/// Typically, the only situations in which `drop` is not executed are:
1719///
1720/// * If you manually choose not to, with [`std::mem::forget`] or
1721///   [`ManuallyDrop`].
1722/// * If cyclic reference counts prevents the task from being destroyed.
1723///
1724/// If you believe any of the above situations are present (the first one is,
1725/// of course, considerably easier to spot), avoid using the `ScopedTask`.
1726///
1727/// # Examples
1728///
1729/// ```
1730/// use glommio::LocalExecutor;
1731///
1732/// let ex = LocalExecutor::default();
1733///
1734/// ex.run(async {
1735///     let a = 2;
1736///     let task = unsafe {
1737///         glommio::spawn_scoped_local(async {
1738///             println!("Hello from a task!");
1739///             1 + a // this is a reference, and it works just fine
1740///         })
1741///     };
1742///
1743///     assert_eq!(task.await, 3);
1744/// });
1745/// ```
1746/// The usual borrow checker rules apply. A [`ScopedTask`] can acquire a mutable
1747/// reference to a variable just fine:
1748///
1749/// ```
1750/// # use glommio::{LocalExecutor};
1751/// #
1752/// # let ex = LocalExecutor::default();
1753/// # ex.run(async {
1754/// let mut a = 2;
1755/// let task = unsafe {
1756///     glommio::spawn_scoped_local(async {
1757///         a = 3;
1758///     })
1759/// };
1760/// task.await;
1761/// assert_eq!(a, 3);
1762/// # });
1763/// ```
1764///
1765/// But until the task completes, the reference is mutably held, so we can no
1766/// longer immutably reference it:
1767///
1768/// ```compile_fail
1769/// # use glommio::LocalExecutor;
1770/// #
1771/// # let ex = LocalExecutor::default();
1772/// # ex.run(async {
1773/// let mut a = 2;
1774/// let task = unsafe {
1775///     glommio::scoped_local(async {
1776///         a = 3;
1777///     })
1778/// };
1779/// assert_eq!(a, 3); // task hasn't completed yet!
1780/// task.await;
1781/// # });
1782/// ```
1783///
1784/// You can still use [`Cell`] and [`RefCell`] normally to work around this.
1785/// Just keep in mind that there is no guarantee of ordering for execution of
1786/// tasks, and if the task has not yet finished the value may or may not have
1787/// changed (as with any interior mutability)
1788///
1789/// ```
1790/// # use glommio::{LocalExecutor};
1791/// # use std::cell::Cell;
1792/// #
1793/// # let ex = LocalExecutor::default();
1794/// # ex.run(async {
1795/// let a = Cell::new(2);
1796/// let task = unsafe {
1797///     glommio::spawn_scoped_local(async {
1798///         a.set(3);
1799///     })
1800/// };
1801///
1802/// assert!(a.get() == 3 || a.get() == 2); // impossible to know if it will be 2 or 3
1803/// task.await;
1804/// assert_eq!(a.get(), 3); // The task finished now.
1805/// //
1806/// # });
1807/// ```
1808///
1809/// The following code, however, will access invalid memory as drop is never
1810/// executed
1811///
1812/// ```no_run
1813/// # use glommio::{LocalExecutor};
1814/// # use std::cell::Cell;
1815/// #
1816/// # let ex = LocalExecutor::default();
1817/// # ex.run(async {
1818/// {
1819///     let a = &mut "mayhem";
1820///     let task = unsafe {
1821///         glommio::spawn_scoped_local(async {
1822///             *a = "doom";
1823///         })
1824///     };
1825///     std::mem::forget(task);
1826/// }
1827/// # });
1828/// ```
1829
1830/// [`Task`]: crate::Task
1831/// [`Cell`]: std::cell::Cell
1832/// [`RefCell`]: std::cell::RefCell
1833/// [`std::mem::forget`]: std::mem::forget
1834/// [`ManuallyDrop`]: std::mem::ManuallyDrop
1835#[must_use = "scoped tasks get canceled when dropped, use a standard Task and `.detach()` to run \
1836              them in the background"]
1837#[derive(Debug)]
1838pub struct ScopedTask<'a, T>(multitask::Task<T>, PhantomData<&'a T>);
1839
1840impl<'a, T> ScopedTask<'a, T> {
1841    /// Cancels the task and waits for it to stop running.
1842    ///
1843    /// Returns the task's output if it was completed just before it got
1844    /// canceled, or [`None`] if it didn't complete.
1845    ///
1846    /// While it's possible to simply drop the [`ScopedTask`] to cancel it, this
1847    /// is a cleaner way of canceling because it also waits for the task to
1848    /// stop running.
1849    ///
1850    /// # Examples
1851    ///
1852    /// ```
1853    /// use futures_lite::future;
1854    /// use glommio::LocalExecutor;
1855    ///
1856    /// let ex = LocalExecutor::default();
1857    ///
1858    /// ex.run(async {
1859    ///     let task = unsafe {
1860    ///         glommio::spawn_scoped_local(async {
1861    ///             loop {
1862    ///                 println!("Even though I'm in an infinite loop, you can still cancel me!");
1863    ///                 future::yield_now().await;
1864    ///             }
1865    ///         })
1866    ///     };
1867    ///
1868    ///     task.cancel().await;
1869    /// });
1870    /// ```
1871    pub async fn cancel(self) -> Option<T> {
1872        self.0.cancel().await
1873    }
1874}
1875
1876impl<'a, T> Future for ScopedTask<'a, T> {
1877    type Output = T;
1878
1879    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1880        Pin::new(&mut self.0).poll(cx)
1881    }
1882}
1883
1884/// Conditionally yields the current task queue. The scheduler may then
1885/// process other task queues according to their latency requirements.
1886/// If a call to this function results in the current queue to yield,
1887/// then the calling task is moved to the back of the yielded task
1888/// queue.
1889///
1890/// Under which condition this function yield is an implementation detail
1891/// subject to change, but it will always be somehow related to the latency
1892/// guarantees that the task queues want to uphold in their
1893/// `Latency::Matters` parameter (or `Latency::NotImportant`).
1894///
1895/// This function is the central mechanism of task cooperation in Glommio
1896/// and should be preferred over unconditional yielding methods like
1897/// [`ExecutorProxy::yield_now`] and
1898/// [`ExecutorProxy::yield_task_queue_now`].
1899#[inline(always)]
1900pub async fn yield_if_needed() {
1901    executor().yield_if_needed().await
1902}
1903
1904/// Spawns a task onto the current single-threaded executor.
1905///
1906/// If called from a [`LocalExecutor`], the task is spawned on it.
1907/// Otherwise, this method panics.
1908///
1909/// Note that there is no guarantee of when the spawned task is scheduled.
1910/// The current task can continue its execution or be preempted by the
1911/// newly spawned task immediately. See the documentation for the
1912/// top-level [`Task`] for examples.
1913///
1914/// Proxy to [`ExecutorProxy::spawn_local`]
1915///
1916/// # Examples
1917///
1918/// ```
1919/// use glommio::{LocalExecutor, Task};
1920///
1921/// let local_ex = LocalExecutor::default();
1922///
1923/// local_ex.run(async {
1924///     let task = glommio::spawn_local(async { 1 + 2 });
1925///     assert_eq!(task.await, 3);
1926/// });
1927/// ```
1928pub fn spawn_local<T>(future: impl Future<Output = T> + 'static) -> Task<T>
1929where
1930    T: 'static,
1931{
1932    executor().spawn_local(future)
1933}
1934
1935/// Allocates a buffer that is suitable for using to write to Direct Memory
1936/// Access File (DMA). Please note that this implementation uses embedded buddy
1937/// allocator to speed up allocation of the memory chunks, but the same
1938/// allocator is used to server memory needed to write/read data from `uring` so
1939/// probably that is not good idea to keep allocated memory for a long time.
1940/// If you want to keep allocated buffer for a long time please use
1941/// ['crate::allocate_dma_buffer_global'] instead.
1942/// Be careful when you use this buffer with DMA file, size and position of the
1943/// buffer should be properly aligned to the block size of the device where the
1944/// file is located
1945///
1946/// * `size` size of the requested buffer in bytes
1947///
1948/// [`DmaFile`]: crate::io::DmaFile
1949pub fn allocate_dma_buffer(size: usize) -> DmaBuffer {
1950    executor().reactor().alloc_dma_buffer(size)
1951}
1952
1953/// Allocates a buffer that is suitable for using to write to Direct Memory
1954/// Access File (DMA). If you do not plan to keep allocated buffer for a long
1955/// time please use ['crate::allocate_dma_buffer'] instead.
1956/// Be careful when you use this buffer with DMA file, size and position of the
1957/// buffer should be properly aligned to the block size of the device where the
1958/// file is located
1959///
1960/// * `size` size of the requested buffer in bytes
1961///
1962/// [`DmaFile`]: crate::io::DmaFile
1963pub fn allocate_dma_buffer_global(size: usize) -> DmaBuffer {
1964    DmaBuffer::new(size).unwrap()
1965}
1966
1967/// Spawns a task onto the current single-threaded executor, in a particular
1968/// task queue
1969///
1970/// If called from a [`LocalExecutor`], the task is spawned on it.
1971/// Otherwise, this method panics.
1972///
1973/// Note that there is no guarantee of when the spawned task is scheduled.
1974/// The current task can continue its execution or be preempted by the
1975/// newly spawned task immediately. See the documentation for the
1976/// top-level [`Task`] for examples.
1977///
1978/// Proxy to [`ExecutorProxy::spawn_local_into`]
1979///
1980/// # Examples
1981///
1982/// ```
1983/// # use glommio::{ LocalExecutor, Shares, Task};
1984///
1985/// # let local_ex = LocalExecutor::default();
1986/// # local_ex.run(async {
1987/// let handle = glommio::executor().create_task_queue(
1988///     Shares::default(),
1989///     glommio::Latency::NotImportant,
1990///     "test_queue",
1991/// );
1992/// let task = glommio::spawn_local_into(async { 1 + 2 }, handle).expect("failed to spawn task");
1993/// assert_eq!(task.await, 3);
1994/// # });
1995/// ```
1996pub fn spawn_local_into<T>(
1997    future: impl Future<Output = T> + 'static,
1998    handle: TaskQueueHandle,
1999) -> Result<Task<T>>
2000where
2001    T: 'static,
2002{
2003    executor().spawn_local_into(future, handle)
2004}
2005
2006/// Spawns a task onto the current single-threaded executor.
2007///
2008/// If called from a [`LocalExecutor`], the task is spawned on it.
2009///
2010/// Otherwise, this method panics.
2011///
2012/// Proxy to [`ExecutorProxy::spawn_scoped_local`]
2013///
2014/// # Safety
2015///
2016/// `ScopedTask` depends on `drop` running or `.await` being called for
2017/// safety. See the struct [`ScopedTask`] for details.
2018///
2019/// # Examples
2020///
2021/// ```
2022/// use glommio::LocalExecutor;
2023///
2024/// let local_ex = LocalExecutor::default();
2025///
2026/// local_ex.run(async {
2027///     let non_static = 2;
2028///     let task = unsafe { glommio::spawn_scoped_local(async { 1 + non_static }) };
2029///     assert_eq!(task.await, 3);
2030/// });
2031/// ```
2032pub unsafe fn spawn_scoped_local<'a, T>(future: impl Future<Output = T> + 'a) -> ScopedTask<'a, T> {
2033    executor().spawn_scoped_local(future)
2034}
2035
2036/// Spawns a task onto the current single-threaded executor, in a particular
2037/// task queue
2038///
2039/// If called from a [`LocalExecutor`], the task is spawned on it.
2040///
2041/// Otherwise, this method panics.
2042///
2043/// Proxy to [`ExecutorProxy::spawn_scoped_local_into`]
2044///
2045/// # Safety
2046///
2047/// `ScopedTask` depends on `drop` running or `.await` being called for
2048/// safety. See the struct [`ScopedTask`] for details.
2049///
2050/// # Examples
2051///
2052/// ```
2053/// use glommio::{LocalExecutor, Shares};
2054///
2055/// let local_ex = LocalExecutor::default();
2056/// local_ex.run(async {
2057///     let handle = glommio::executor().create_task_queue(
2058///         Shares::default(),
2059///         glommio::Latency::NotImportant,
2060///         "test_queue",
2061///     );
2062///     let non_static = 2;
2063///     let task = unsafe {
2064///         glommio::spawn_scoped_local_into(async { 1 + non_static }, handle)
2065///             .expect("failed to spawn task")
2066///     };
2067///     assert_eq!(task.await, 3);
2068/// })
2069/// ```
2070pub unsafe fn spawn_scoped_local_into<'a, T>(
2071    future: impl Future<Output = T> + 'a,
2072    handle: TaskQueueHandle,
2073) -> Result<ScopedTask<'a, T>> {
2074    executor().spawn_scoped_local_into(future, handle)
2075}
2076
2077/// A proxy struct to the underlying [`LocalExecutor`]. It is accessible from
2078/// anywhere within a Glommio context using [`executor()`].
2079#[derive(Debug)]
2080pub struct ExecutorProxy {}
2081
2082impl ExecutorProxy {
2083    /// Checks if this task has run for too long and need to be preempted. This
2084    /// is useful for situations where we can't call .await, for instance,
2085    /// if a [`RefMut`] is held. If this tests true, then the user is
2086    /// responsible for making any preparations necessary for calling .await
2087    /// and doing it themselves.
2088    ///
2089    /// # Examples
2090    ///
2091    /// ```
2092    /// use glommio::LocalExecutorBuilder;
2093    ///
2094    /// let ex = LocalExecutorBuilder::default()
2095    ///     .spawn(|| async {
2096    ///         loop {
2097    ///             if glommio::executor().need_preempt() {
2098    ///                 break;
2099    ///             }
2100    ///         }
2101    ///     })
2102    ///     .unwrap();
2103    ///
2104    /// ex.join().unwrap();
2105    /// ```
2106    ///
2107    /// [`RefMut`]: https://doc.rust-lang.org/std/cell/struct.RefMut.html
2108    #[inline(always)]
2109    pub fn need_preempt(&self) -> bool {
2110        #[cfg(not(feature = "native-tls"))]
2111        return LOCAL_EX.with(|local_ex| local_ex.need_preempt());
2112
2113        #[cfg(feature = "native-tls")]
2114        return unsafe {
2115            LOCAL_EX
2116                .as_ref()
2117                .map(|ex| ex.need_preempt())
2118                .unwrap_or_default()
2119        };
2120    }
2121
2122    /// Conditionally yields the current task queue. The scheduler may then
2123    /// process other task queues according to their latency requirements.
2124    /// If a call to this function results in the current queue to yield,
2125    /// then the calling task is moved to the back of the yielded task
2126    /// queue.
2127    ///
2128    /// Under which condition this function yield is an implementation detail
2129    /// subject to change, but it will always be somehow related to the latency
2130    /// guarantees that the task queues want to uphold in their
2131    /// `Latency::Matters` parameter (or `Latency::NotImportant`).
2132    ///
2133    /// This function is the central mechanism of task cooperation in Glommio
2134    /// and should be preferred over unconditional yielding methods like
2135    /// [`ExecutorProxy::yield_now`] and
2136    /// [`ExecutorProxy::yield_task_queue_now`].
2137    #[inline(always)]
2138    pub async fn yield_if_needed(&self) {
2139        #[cfg(not(feature = "native-tls"))]
2140        {
2141            let need_yield = if LOCAL_EX.is_set() {
2142                LOCAL_EX.with(|local_ex| {
2143                    if local_ex.need_preempt() {
2144                        local_ex.mark_me_for_yield();
2145                        true
2146                    } else {
2147                        false
2148                    }
2149                })
2150            } else {
2151                // We are not in a glommio context
2152                false
2153            };
2154
2155            if need_yield {
2156                futures_lite::future::yield_now().await;
2157            }
2158        }
2159
2160        #[cfg(feature = "native-tls")]
2161        unsafe {
2162            if self.need_preempt() {
2163                (*LOCAL_EX).mark_me_for_yield();
2164                futures_lite::future::yield_now().await;
2165            }
2166        }
2167    }
2168
2169    /// Unconditionally yields the current task and forces the scheduler
2170    /// to poll another task within the current task queue.
2171    /// Calling this wakes the current task and returns [`Poll::Pending`] once.
2172    ///
2173    /// Unless you know you need to yield right now, using
2174    /// [`ExecutorProxy::yield_if_needed`] instead is the better choice.
2175    #[inline(always)]
2176    pub async fn yield_now(&self) {
2177        futures_lite::future::yield_now().await
2178    }
2179
2180    /// Unconditionally yields the current task queue and forces the scheduler
2181    /// to poll another queue. Use [`ExecutorProxy::yield_now`] to yield within
2182    /// a queue.
2183    ///
2184    /// Unless you know you need to yield right now, using
2185    /// [`ExecutorProxy::yield_if_needed`] instead is the better choice.
2186    #[inline(always)]
2187    pub async fn yield_task_queue_now(&self) {
2188        #[cfg(not(feature = "native-tls"))]
2189        {
2190            if LOCAL_EX.is_set() {
2191                LOCAL_EX.with(|local_ex| {
2192                    local_ex.mark_me_for_yield();
2193                })
2194            }
2195            futures_lite::future::yield_now().await;
2196        }
2197
2198        #[cfg(feature = "native-tls")]
2199        {
2200            if let Some(local_ex) = unsafe { LOCAL_EX.as_ref() } {
2201                local_ex.mark_me_for_yield();
2202            }
2203            futures_lite::future::yield_now().await;
2204        }
2205    }
2206
2207    #[inline(always)]
2208    pub(crate) fn reactor(&self) -> Rc<reactor::Reactor> {
2209        #[cfg(not(feature = "native-tls"))]
2210        return LOCAL_EX.with(|local_ex| local_ex.get_reactor());
2211
2212        #[cfg(feature = "native-tls")]
2213        return unsafe {
2214            LOCAL_EX
2215                .as_ref()
2216                .expect("this thread doesn't have a LocalExecutor running")
2217                .get_reactor()
2218        };
2219    }
2220
2221    /// Returns the id of the current executor
2222    ///
2223    /// If called from a [`LocalExecutor`], returns the id of the executor.
2224    ///
2225    /// Otherwise, this method panics.
2226    ///
2227    /// # Examples
2228    ///
2229    /// ```
2230    /// use glommio::{LocalExecutor, Task};
2231    ///
2232    /// let local_ex = LocalExecutor::default();
2233    ///
2234    /// local_ex.run(async {
2235    ///     println!("my ID: {}", glommio::executor().id());
2236    /// });
2237    /// ```
2238    pub fn id(&self) -> usize {
2239        #[cfg(not(feature = "native-tls"))]
2240        return LOCAL_EX.with(|local_ex| local_ex.id());
2241
2242        #[cfg(feature = "native-tls")]
2243        return unsafe {
2244            LOCAL_EX
2245                .as_ref()
2246                .expect("this thread doesn't have a LocalExecutor running")
2247                .id()
2248        };
2249    }
2250
2251    /// Creates a new task queue, with a given latency hint and the provided
2252    /// name
2253    ///
2254    /// Each task queue is scheduled based on the [`Shares`] and [`Latency`]
2255    /// system, and tasks within a queue will be scheduled in serial.
2256    ///
2257    /// Returns an opaque handle that can later be used to launch tasks into
2258    /// that queue with [`local_into`].
2259    ///
2260    /// # Examples
2261    ///
2262    /// ```
2263    /// use glommio::{Latency, LocalExecutor, Shares};
2264    /// use std::time::Duration;
2265    ///
2266    /// let local_ex = LocalExecutor::default();
2267    /// local_ex.run(async move {
2268    ///     let task_queue = glommio::executor().create_task_queue(
2269    ///         Shares::default(),
2270    ///         Latency::Matters(Duration::from_secs(1)),
2271    ///         "my_tq",
2272    ///     );
2273    ///     let task = glommio::spawn_local_into(
2274    ///         async {
2275    ///             println!("Hello world");
2276    ///         },
2277    ///         task_queue,
2278    ///     )
2279    ///     .expect("failed to spawn task");
2280    /// });
2281    /// ```
2282    ///
2283    /// [`local_into`]: crate::spawn_local_into
2284    /// [`Shares`]: enum.Shares.html
2285    /// [`Latency`]: enum.Latency.html
2286    pub fn create_task_queue(
2287        &self,
2288        shares: Shares,
2289        latency: Latency,
2290        name: &str,
2291    ) -> TaskQueueHandle {
2292        #[cfg(not(feature = "native-tls"))]
2293        return LOCAL_EX.with(|local_ex| local_ex.create_task_queue(shares, latency, name));
2294
2295        #[cfg(feature = "native-tls")]
2296        return unsafe {
2297            LOCAL_EX
2298                .as_ref()
2299                .expect("this thread doesn't have a LocalExecutor running")
2300                .create_task_queue(shares, latency, name)
2301        };
2302    }
2303
2304    /// Returns the [`TaskQueueHandle`] that represents the TaskQueue currently
2305    /// running. This can be passed directly into [`crate::spawn_local_into`].
2306    /// This must be run from a task that was generated through
2307    /// [`crate::spawn_local`] or [`crate::spawn_local_into`]
2308    ///
2309    /// # Examples
2310    /// ```
2311    /// use glommio::{Latency, LocalExecutor, LocalExecutorBuilder, Shares};
2312    ///
2313    /// let ex = LocalExecutorBuilder::default()
2314    ///     .spawn(|| async move {
2315    ///         let original_tq = glommio::executor().current_task_queue();
2316    ///         let new_tq = glommio::executor().create_task_queue(
2317    ///             Shares::default(),
2318    ///             Latency::NotImportant,
2319    ///             "test",
2320    ///         );
2321    ///
2322    ///         let task = glommio::spawn_local_into(
2323    ///             async move {
2324    ///                 glommio::spawn_local_into(
2325    ///                     async move {
2326    ///                         assert_eq!(glommio::executor().current_task_queue(), original_tq);
2327    ///                     },
2328    ///                     original_tq,
2329    ///                 )
2330    ///                 .unwrap();
2331    ///             },
2332    ///             new_tq,
2333    ///         )
2334    ///         .unwrap();
2335    ///         task.await;
2336    ///     })
2337    ///     .unwrap();
2338    ///
2339    /// ex.join().unwrap();
2340    /// ```
2341    pub fn current_task_queue(&self) -> TaskQueueHandle {
2342        #[cfg(not(feature = "native-tls"))]
2343        return LOCAL_EX.with(|local_ex| local_ex.current_task_queue());
2344
2345        #[cfg(feature = "native-tls")]
2346        return unsafe {
2347            LOCAL_EX
2348                .as_ref()
2349                .expect("this thread doesn't have a LocalExecutor running")
2350                .current_task_queue()
2351        };
2352    }
2353
2354    /// Returns a [`Result`] with its `Ok` value wrapping a [`TaskQueueStats`]
2355    /// or a [`GlommioError`] of type `[QueueErrorKind`] if there is no task
2356    /// queue with this handle
2357    ///
2358    /// # Examples
2359    /// ```
2360    /// use glommio::{Latency, LocalExecutorBuilder, Shares};
2361    ///
2362    /// let ex = LocalExecutorBuilder::default()
2363    ///     .spawn(|| async move {
2364    ///         let new_tq = glommio::executor().create_task_queue(
2365    ///             Shares::default(),
2366    ///             Latency::NotImportant,
2367    ///             "test",
2368    ///         );
2369    ///         println!(
2370    ///             "Stats for test: {:?}",
2371    ///             glommio::executor().task_queue_stats(new_tq).unwrap()
2372    ///         );
2373    ///     })
2374    ///     .unwrap();
2375    ///
2376    /// ex.join().unwrap();
2377    /// ```
2378    ///
2379    /// [`ExecutorStats`]: struct.ExecutorStats.html
2380    /// [`GlommioError`]: crate::error::GlommioError
2381    /// [`QueueErrorKind`]: crate::error::QueueErrorKind
2382    /// [`Result`]: https://doc.rust-lang.org/std/result/enum.Result.html
2383    pub fn task_queue_stats(&self, handle: TaskQueueHandle) -> Result<TaskQueueStats> {
2384        #[cfg(not(feature = "native-tls"))]
2385        return LOCAL_EX.with(|local_ex| match local_ex.get_queue(&handle) {
2386            Some(x) => Ok(x.borrow_mut().stats.take()),
2387            None => Err(GlommioError::queue_not_found(handle.index)),
2388        });
2389
2390        #[cfg(feature = "native-tls")]
2391        return match unsafe {
2392            LOCAL_EX
2393                .as_ref()
2394                .expect("this thread doesn't have a LocalExecutor running")
2395                .get_queue(&handle)
2396        } {
2397            Some(x) => Ok(x.borrow_mut().stats.take()),
2398            None => Err(GlommioError::queue_not_found(handle.index)),
2399        };
2400    }
2401
2402    /// Returns a collection of [`TaskQueueStats`] with information about all
2403    /// task queues in the system
2404    ///
2405    /// The collection can be anything that implements [`Extend`] and it is
2406    /// initially passed by the user, so they can control how allocations are
2407    /// done.
2408    ///
2409    /// # Examples
2410    /// ```
2411    /// use glommio::{executor, Latency, LocalExecutorBuilder, Shares};
2412    ///
2413    /// let ex = LocalExecutorBuilder::default()
2414    ///     .spawn(|| async move {
2415    ///         let new_tq = glommio::executor().create_task_queue(
2416    ///             Shares::default(),
2417    ///             Latency::NotImportant,
2418    ///             "test",
2419    ///         );
2420    ///         let v = Vec::new();
2421    ///         println!(
2422    ///             "Stats for all queues: {:?}",
2423    ///             glommio::executor().all_task_queue_stats(v)
2424    ///         );
2425    ///     })
2426    ///     .unwrap();
2427    ///
2428    /// ex.join().unwrap();
2429    /// ```
2430    ///
2431    /// [`ExecutorStats`]: struct.ExecutorStats.html
2432    /// [`Result`]: https://doc.rust-lang.org/std/result/enum.Result.html
2433    /// [`Extend`]: https://doc.rust-lang.org/std/iter/trait.Extend.html
2434    pub fn all_task_queue_stats<V>(&self, mut output: V) -> V
2435    where
2436        V: Extend<TaskQueueStats>,
2437    {
2438        #[cfg(not(feature = "native-tls"))]
2439        LOCAL_EX.with(|local_ex| {
2440            output.extend(
2441                local_ex
2442                    .queues
2443                    .borrow()
2444                    .available_executors
2445                    .values()
2446                    .map(|x| x.borrow_mut().stats.take()),
2447            );
2448        });
2449
2450        #[cfg(feature = "native-tls")]
2451        output.extend(unsafe {
2452            LOCAL_EX
2453                .as_ref()
2454                .expect("this thread doesn't have a LocalExecutor running")
2455                .queues
2456                .borrow()
2457                .available_executors
2458                .values()
2459                .map(|x| x.borrow_mut().stats.take())
2460        });
2461
2462        output
2463    }
2464
2465    /// Returns a [`ExecutorStats`] struct with information about this Executor
2466    ///
2467    /// # Examples:
2468    ///
2469    /// ```
2470    /// use glommio::{executor, LocalExecutorBuilder};
2471    ///
2472    /// let ex = LocalExecutorBuilder::default()
2473    ///     .spawn(|| async move {
2474    ///         println!(
2475    ///             "Stats for executor: {:?}",
2476    ///             glommio::executor().executor_stats()
2477    ///         );
2478    ///     })
2479    ///     .unwrap();
2480    ///
2481    /// ex.join().unwrap();
2482    /// ```
2483    ///
2484    /// [`ExecutorStats`]: struct.ExecutorStats.html
2485    pub fn executor_stats(&self) -> ExecutorStats {
2486        #[cfg(not(feature = "native-tls"))]
2487        return LOCAL_EX.with(|local_ex| std::mem::take(&mut local_ex.queues.borrow_mut().stats));
2488
2489        #[cfg(feature = "native-tls")]
2490        return std::mem::take(unsafe {
2491            &mut LOCAL_EX
2492                .as_ref()
2493                .expect("this thread doesn't have a LocalExecutor running")
2494                .queues
2495                .borrow_mut()
2496                .stats
2497        });
2498    }
2499
2500    /// Returns an [`IoStats`] struct with information about IO performed by
2501    /// this executor's reactor
2502    ///
2503    /// # Examples:
2504    ///
2505    /// ```
2506    /// use glommio::LocalExecutorBuilder;
2507    ///
2508    /// let ex = LocalExecutorBuilder::default()
2509    ///     .spawn(|| async move {
2510    ///         println!("Stats for executor: {:?}", glommio::executor().io_stats());
2511    ///     })
2512    ///     .unwrap();
2513    ///
2514    /// ex.join().unwrap();
2515    /// ```
2516    ///
2517    /// [`IoStats`]: crate::IoStats
2518    pub fn io_stats(&self) -> IoStats {
2519        #[cfg(not(feature = "native-tls"))]
2520        return LOCAL_EX.with(|local_ex| local_ex.get_reactor().io_stats());
2521
2522        #[cfg(feature = "native-tls")]
2523        return unsafe {
2524            LOCAL_EX
2525                .as_ref()
2526                .expect("this thread doesn't have a LocalExecutor running")
2527                .get_reactor()
2528                .io_stats()
2529        };
2530    }
2531
2532    /// Returns an [`IoStats`] struct with information about IO performed from
2533    /// the provided TaskQueue by this executor's reactor
2534    ///
2535    /// # Examples:
2536    ///
2537    /// ```
2538    /// use glommio::{Latency, LocalExecutorBuilder, Shares};
2539    ///
2540    /// let ex = LocalExecutorBuilder::default()
2541    ///     .spawn(|| async move {
2542    ///         let new_tq = glommio::executor().create_task_queue(
2543    ///             Shares::default(),
2544    ///             Latency::NotImportant,
2545    ///             "test",
2546    ///         );
2547    ///         println!(
2548    ///             "Stats for executor: {:?}",
2549    ///             glommio::executor().task_queue_io_stats(new_tq)
2550    ///         );
2551    ///     })
2552    ///     .unwrap();
2553    ///
2554    /// ex.join().unwrap();
2555    /// ```
2556    ///
2557    /// [`IoStats`]: crate::IoStats
2558    pub fn task_queue_io_stats(&self, handle: TaskQueueHandle) -> Result<IoStats> {
2559        #[cfg(not(feature = "native-tls"))]
2560        return LOCAL_EX.with(|local_ex| {
2561            match local_ex.get_reactor().task_queue_io_stats(&handle) {
2562                Some(x) => Ok(x),
2563                None => Err(GlommioError::queue_not_found(handle.index)),
2564            }
2565        });
2566
2567        #[cfg(feature = "native-tls")]
2568        return match unsafe {
2569            LOCAL_EX
2570                .as_ref()
2571                .expect("this thread doesn't have a LocalExecutor running")
2572                .get_reactor()
2573                .task_queue_io_stats(&handle)
2574        } {
2575            Some(x) => Ok(x),
2576            None => Err(GlommioError::queue_not_found(handle.index)),
2577        };
2578    }
2579
2580    /// Spawns a task onto the current single-threaded executor.
2581    ///
2582    /// If called from a [`LocalExecutor`], the task is spawned on it.
2583    /// Otherwise, this method panics.
2584    ///
2585    /// Note that there is no guarantee of when the spawned task is scheduled.
2586    /// The current task can continue its execution or be preempted by the
2587    /// newly spawned task immediately. See the documentation for the
2588    /// top-level [`Task`] for examples.
2589    ///
2590    /// # Examples
2591    ///
2592    /// ```
2593    /// use glommio::{LocalExecutor, Task};
2594    ///
2595    /// let local_ex = LocalExecutor::default();
2596    ///
2597    /// local_ex.run(async {
2598    ///     let task = glommio::executor().spawn_local(async { 1 + 2 });
2599    ///     assert_eq!(task.await, 3);
2600    /// });
2601    /// ```
2602    pub fn spawn_local<T>(&self, future: impl Future<Output = T> + 'static) -> Task<T>
2603    where
2604        T: 'static,
2605    {
2606        #[cfg(not(feature = "native-tls"))]
2607        return LOCAL_EX.with(|local_ex| Task::<T>(local_ex.spawn(future)));
2608
2609        #[cfg(feature = "native-tls")]
2610        return Task::<T>(unsafe {
2611            LOCAL_EX
2612                .as_ref()
2613                .expect("this thread doesn't have a LocalExecutor running")
2614                .spawn(future)
2615        });
2616    }
2617
2618    /// Spawns a task onto the current single-threaded executor, in a particular
2619    /// task queue
2620    ///
2621    /// If called from a [`LocalExecutor`], the task is spawned on it.
2622    /// Otherwise, this method panics.
2623    ///
2624    /// Note that there is no guarantee of when the spawned task is scheduled.
2625    /// The current task can continue its execution or be preempted by the
2626    /// newly spawned task immediately. See the documentation for the
2627    /// top-level [`Task`] for examples.
2628    ///
2629    /// # Examples
2630    ///
2631    /// ```
2632    /// # use glommio::{LocalExecutor, Shares, Task};
2633    ///
2634    /// # let local_ex = LocalExecutor::default();
2635    /// # local_ex.run(async {
2636    /// let handle = glommio::executor().create_task_queue(
2637    ///     Shares::default(),
2638    ///     glommio::Latency::NotImportant,
2639    ///     "test_queue",
2640    /// );
2641    /// let task = glommio::executor()
2642    ///     .spawn_local_into(async { 1 + 2 }, handle)
2643    ///     .expect("failed to spawn task");
2644    /// assert_eq!(task.await, 3);
2645    /// # });
2646    /// ```
2647    pub fn spawn_local_into<T>(
2648        &self,
2649        future: impl Future<Output = T> + 'static,
2650        handle: TaskQueueHandle,
2651    ) -> Result<Task<T>>
2652    where
2653        T: 'static,
2654    {
2655        #[cfg(not(feature = "native-tls"))]
2656        return LOCAL_EX.with(|local_ex| local_ex.spawn_into(future, handle).map(Task::<T>));
2657
2658        #[cfg(feature = "native-tls")]
2659        return unsafe {
2660            LOCAL_EX
2661                .as_ref()
2662                .expect("this thread doesn't have a LocalExecutor running")
2663                .spawn_into(future, handle)
2664        }
2665        .map(Task::<T>);
2666    }
2667
2668    /// Spawns a task onto the current single-threaded executor.
2669    ///
2670    /// If called from a [`LocalExecutor`], the task is spawned on it.
2671    ///
2672    /// Otherwise, this method panics.
2673    ///
2674    /// # Safety
2675    ///
2676    /// `ScopedTask` depends on `drop` running or `.await` being called for
2677    /// safety. See the struct [`ScopedTask`] for details.
2678    ///
2679    /// # Examples
2680    ///
2681    /// ```
2682    /// use glommio::LocalExecutor;
2683    ///
2684    /// let local_ex = LocalExecutor::default();
2685    ///
2686    /// local_ex.run(async {
2687    ///     let non_static = 2;
2688    ///     let task = unsafe { glommio::executor().spawn_scoped_local(async { 1 + non_static }) };
2689    ///     assert_eq!(task.await, 3);
2690    /// });
2691    /// ```
2692    pub unsafe fn spawn_scoped_local<'a, T>(
2693        &self,
2694        future: impl Future<Output = T> + 'a,
2695    ) -> ScopedTask<'a, T> {
2696        #[cfg(not(feature = "native-tls"))]
2697        return LOCAL_EX.with(|local_ex| ScopedTask::<'a, T>(local_ex.spawn(future), PhantomData));
2698
2699        #[cfg(feature = "native-tls")]
2700        return ScopedTask::<'a, T>(
2701            LOCAL_EX
2702                .as_ref()
2703                .expect("this thread doesn't have a LocalExecutor running")
2704                .spawn(future),
2705            PhantomData,
2706        );
2707    }
2708
2709    /// Spawns a task onto the current single-threaded executor, in a particular
2710    /// task queue
2711    ///
2712    /// If called from a [`LocalExecutor`], the task is spawned on it.
2713    ///
2714    /// Otherwise, this method panics.
2715    ///
2716    /// # Safety
2717    ///
2718    /// `ScopedTask` depends on `drop` running or `.await` being called for
2719    /// safety. See the struct [`ScopedTask`] for details.
2720    ///
2721    /// # Examples
2722    ///
2723    /// ```
2724    /// use glommio::{LocalExecutor, Shares};
2725    ///
2726    /// let local_ex = LocalExecutor::default();
2727    /// local_ex.run(async {
2728    ///     let handle = glommio::executor().create_task_queue(
2729    ///         Shares::default(),
2730    ///         glommio::Latency::NotImportant,
2731    ///         "test_queue",
2732    ///     );
2733    ///     let non_static = 2;
2734    ///     let task = unsafe {
2735    ///         glommio::executor()
2736    ///             .spawn_scoped_local_into(async { 1 + non_static }, handle)
2737    ///             .expect("failed to spawn task")
2738    ///     };
2739    ///     assert_eq!(task.await, 3);
2740    /// })
2741    /// ```
2742    pub unsafe fn spawn_scoped_local_into<'a, T>(
2743        &self,
2744        future: impl Future<Output = T> + 'a,
2745        handle: TaskQueueHandle,
2746    ) -> Result<ScopedTask<'a, T>> {
2747        #[cfg(not(feature = "native-tls"))]
2748        return LOCAL_EX.with(|local_ex| {
2749            local_ex
2750                .spawn_into(future, handle)
2751                .map(|x| ScopedTask::<'a, T>(x, PhantomData))
2752        });
2753
2754        #[cfg(feature = "native-tls")]
2755        return LOCAL_EX
2756            .as_ref()
2757            .expect("this thread doesn't have a LocalExecutor running")
2758            .spawn_into(future, handle)
2759            .map(|x| ScopedTask::<'a, T>(x, PhantomData));
2760    }
2761
2762    /// Spawns a blocking task into a background thread where blocking is
2763    /// acceptable.
2764    ///
2765    /// Glommio depends on cooperation from tasks in order to drive IO and meet
2766    /// latency requirements. Unyielding tasks are detrimental to the
2767    /// performance of the overall system, not just to the performance of
2768    /// the one stalling task.
2769    ///
2770    /// `spawn_blocking` is there as a last resort when a blocking task needs to
2771    /// be executed and cannot be made cooperative. Examples are:
2772    /// * Expensive syscalls that cannot use `io_uring`, such as `mmap`
2773    ///   (especially with `MAP_POPULATE`)
2774    /// * Calls to synchronous third-party code (compression, encoding, etc.)
2775    ///
2776    /// # Note
2777    ///
2778    /// *This method is not meant to be a way to achieve compute parallelism.*
2779    /// Distributing work across executors is the better way to achieve that.
2780    ///
2781    /// # Examples
2782    ///
2783    /// ```
2784    /// use glommio::{LocalExecutor, Task};
2785    /// use std::time::Duration;
2786    ///
2787    /// let local_ex = LocalExecutor::default();
2788    ///
2789    /// local_ex.run(async {
2790    ///     let task = glommio::executor()
2791    ///         .spawn_blocking(|| {
2792    ///             std::thread::sleep(Duration::from_millis(100));
2793    ///         })
2794    ///         .await;
2795    /// });
2796    /// ```
2797    pub fn spawn_blocking<F, R>(&self, func: F) -> impl Future<Output = R>
2798    where
2799        F: FnOnce() -> R + Send + 'static,
2800        R: Send + 'static,
2801    {
2802        let result = Arc::new(Mutex::new(MaybeUninit::<R>::uninit()));
2803        let f_inner = enclose::enclose!((result) move || {result.lock().unwrap().write(func());});
2804
2805        #[cfg(not(feature = "native-tls"))]
2806        let waiter =
2807            LOCAL_EX.with(move |local_ex| local_ex.reactor.run_blocking(Box::new(f_inner)));
2808
2809        #[cfg(feature = "native-tls")]
2810        let waiter = unsafe {
2811            LOCAL_EX
2812                .as_ref()
2813                .expect("this thread doesn't have a LocalExecutor running")
2814                .reactor
2815                .run_blocking(Box::new(f_inner))
2816        };
2817
2818        async move {
2819            let source = waiter.await;
2820            assert!(source.collect_rw().await.is_ok());
2821            unsafe {
2822                let res_arc = Arc::try_unwrap(result).expect("leak");
2823                let ret = std::mem::replace(
2824                    &mut *res_arc.lock().unwrap().deref_mut(),
2825                    MaybeUninit::<R>::uninit(),
2826                )
2827                .assume_init();
2828                ret
2829            }
2830        }
2831    }
2832}
2833
2834#[cfg(test)]
2835mod test {
2836    use core::mem::MaybeUninit;
2837    use std::{
2838        cell::Cell,
2839        collections::HashMap,
2840        sync::{
2841            atomic::{AtomicUsize, Ordering},
2842            Arc, Mutex,
2843        },
2844        task::Waker,
2845    };
2846
2847    use futures::{
2848        future::{join, join_all, poll_fn},
2849        join,
2850    };
2851
2852    use crate::{
2853        enclose,
2854        timer::{self, sleep, Timer},
2855        SharesManager,
2856    };
2857
2858    use super::*;
2859
2860    #[test]
2861    fn create_and_destroy_executor() {
2862        let mut var = Rc::new(RefCell::new(0));
2863        let local_ex = LocalExecutor::default();
2864        let varclone = var.clone();
2865        local_ex.run(async move {
2866            let mut m = varclone.borrow_mut();
2867            *m += 10;
2868        });
2869
2870        let v = Rc::get_mut(&mut var).unwrap();
2871        let v = v.replace(0);
2872        assert_eq!(v, 10);
2873    }
2874
2875    #[test]
2876    fn create_fail_to_bind() {
2877        // If you have a system with 4 billion CPUs let me know and I will
2878        // update this test.
2879        if LocalExecutorBuilder::new(Placement::Fixed(usize::MAX))
2880            .make()
2881            .is_ok()
2882        {
2883            unreachable!("Should have failed");
2884        }
2885    }
2886
2887    #[test]
2888    fn bind_to_cpu_set_range() {
2889        // libc supports cpu ids up to 1023 and will use the intersection of values
2890        // specified by the cpu mask and those present on the system
2891        // https://man7.org/linux/man-pages/man2/sched_setaffinity.2.html#NOTES
2892        assert!(bind_to_cpu_set(vec![0, 1, 2, 3]).is_ok());
2893        assert!(bind_to_cpu_set(0..1024).is_ok());
2894        assert!(bind_to_cpu_set(0..1025).is_err());
2895    }
2896
2897    #[test]
2898    fn create_and_bind() {
2899        if let Err(x) = LocalExecutorBuilder::new(Placement::Fixed(0)).make() {
2900            panic!("got error {:?}", x);
2901        }
2902    }
2903
2904    #[test]
2905    #[should_panic]
2906    fn spawn_without_executor() {
2907        let _ = LocalExecutor::default();
2908        std::mem::drop(crate::spawn_local(async move {}));
2909    }
2910
2911    #[test]
2912    fn invalid_task_queue() {
2913        let local_ex = LocalExecutor::default();
2914        local_ex.run(async {
2915            let task = crate::spawn_local_into(
2916                async move {
2917                    unreachable!("Should not have executed this");
2918                },
2919                TaskQueueHandle { index: 1 },
2920            );
2921
2922            if task.is_ok() {
2923                unreachable!("Should have failed");
2924            }
2925        });
2926    }
2927
2928    #[test]
2929    fn ten_yielding_queues() {
2930        let local_ex = LocalExecutor::default();
2931
2932        // 0 -> no one
2933        // 1 -> t1
2934        // 2 -> t2...
2935        let executed_last = Rc::new(RefCell::new(0));
2936        local_ex.run(async {
2937            let mut joins = Vec::with_capacity(10);
2938            for id in 1..11 {
2939                let exec = executed_last.clone();
2940                joins.push(crate::spawn_local(async move {
2941                    for _ in 0..10_000 {
2942                        {
2943                            let mut last = exec.borrow_mut();
2944                            assert_ne!(id, *last);
2945                            *last = id;
2946                        }
2947                        crate::executor().yield_task_queue_now().await;
2948                    }
2949                }));
2950            }
2951            futures::future::join_all(joins).await;
2952        });
2953    }
2954
2955    #[test]
2956    fn task_with_latency_requirements() {
2957        let local_ex = LocalExecutor::default();
2958
2959        local_ex.run(async {
2960            let not_latency = crate::executor().create_task_queue(
2961                Shares::default(),
2962                Latency::NotImportant,
2963                "test",
2964            );
2965            let latency = crate::executor().create_task_queue(
2966                Shares::default(),
2967                Latency::Matters(Duration::from_millis(2)),
2968                "testlat",
2969            );
2970
2971            let nolat_started = Rc::new(RefCell::new(false));
2972            let lat_status = Rc::new(RefCell::new(false));
2973
2974            // Loop until need_preempt is set. It is set to 2ms, but because this is a test
2975            // and can be running overcommited or in whichever shared infrastructure, we'll
2976            // allow the timer to fire in up to 1s. If it didn't fire in 1s, that's broken.
2977            let nolat = local_ex
2978                .spawn_into(
2979                    crate::enclose! { (nolat_started, lat_status)
2980                        async move {
2981                            *(nolat_started.borrow_mut()) = true;
2982
2983                            let start = Instant::now();
2984                            // Now busy loop and make sure that we yield when we have too.
2985                            loop {
2986                                if *(lat_status.borrow()) {
2987                                    break; // Success!
2988                                }
2989                                if start.elapsed().as_secs() > 1 {
2990                                    panic!("Never received preempt signal");
2991                                }
2992                                crate::yield_if_needed().await;
2993                            }
2994                        }
2995                    },
2996                    not_latency,
2997                )
2998                .unwrap();
2999
3000            let lat = local_ex
3001                .spawn_into(
3002                    crate::enclose! { (nolat_started, lat_status)
3003                        async move {
3004                            // In case we are executed first, yield to the other task
3005                            loop {
3006                                if !(*(nolat_started.borrow())) {
3007                                    crate::executor().yield_task_queue_now().await;
3008                                } else {
3009                                    break;
3010                                }
3011                            }
3012                            *(lat_status.borrow_mut()) = true;
3013                        }
3014                    },
3015                    latency,
3016                )
3017                .unwrap();
3018
3019            futures::join!(nolat, lat);
3020        });
3021    }
3022
3023    #[test]
3024    fn current_task_queue_matches() {
3025        let local_ex = LocalExecutor::default();
3026        local_ex.run(async {
3027            let tq1 = crate::executor().create_task_queue(
3028                Shares::default(),
3029                Latency::NotImportant,
3030                "test1",
3031            );
3032            let tq2 = crate::executor().create_task_queue(
3033                Shares::default(),
3034                Latency::NotImportant,
3035                "test2",
3036            );
3037
3038            let id1 = tq1.index;
3039            let id2 = tq2.index;
3040            let j0 = crate::spawn_local(async {
3041                assert_eq!(crate::executor().current_task_queue().index, 0);
3042            });
3043            let j1 = crate::spawn_local_into(
3044                async move {
3045                    assert_eq!(crate::executor().current_task_queue().index, id1);
3046                },
3047                tq1,
3048            )
3049            .unwrap();
3050            let j2 = crate::spawn_local_into(
3051                async move {
3052                    assert_eq!(crate::executor().current_task_queue().index, id2);
3053                },
3054                tq2,
3055            )
3056            .unwrap();
3057            futures::join!(j0, j1, j2);
3058        })
3059    }
3060
3061    #[test]
3062    fn task_optimized_for_throughput() {
3063        let local_ex = LocalExecutor::default();
3064
3065        local_ex.run(async {
3066            let tq1 = crate::executor().create_task_queue(
3067                Shares::default(),
3068                Latency::NotImportant,
3069                "test",
3070            );
3071            let tq2 = crate::executor().create_task_queue(
3072                Shares::default(),
3073                Latency::NotImportant,
3074                "testlat",
3075            );
3076
3077            let first_started = Rc::new(RefCell::new(0));
3078            let second_status = Rc::new(RefCell::new(0));
3079
3080            let first = local_ex
3081                .spawn_into(
3082                    crate::enclose! { (first_started, second_status)
3083                        async move {
3084                            let start = Instant::now();
3085                            // Now busy loop and make sure that we yield when we have too.
3086                            loop {
3087                                {
3088                                    let mut count = first_started.borrow_mut();
3089                                    *count += 1;
3090
3091                                    if start.elapsed().as_millis() >= 99 {
3092                                        break;
3093                                    }
3094
3095                                    if *count < *(second_status.borrow()) {
3096                                        panic!("I was preempted but should not have been");
3097                                    }
3098                                }
3099                                crate::yield_if_needed().await;
3100                            }
3101                        }
3102                    },
3103                    tq1,
3104                )
3105                .unwrap();
3106
3107            let second = local_ex
3108                .spawn_into(
3109                    crate::enclose! { (first_started, second_status)
3110                        async move {
3111                            // In case we are executed first, yield to the other task
3112                            loop {
3113                                {
3114                                    let mut count = second_status.borrow_mut();
3115                                    *count += 1;
3116                                    if *count < *(first_started.borrow()) {
3117                                       break;
3118                                    }
3119                                }
3120                                crate::executor().yield_task_queue_now().await;
3121                            }
3122                        }
3123                    },
3124                    tq2,
3125                )
3126                .unwrap();
3127
3128            futures::join!(first, second);
3129        });
3130    }
3131
3132    #[test]
3133    fn test_detach() {
3134        let ex = LocalExecutor::default();
3135
3136        ex.run(async {
3137            crate::spawn_local(async {
3138                loop {
3139                    crate::executor().yield_task_queue_now().await;
3140                }
3141            })
3142            .detach();
3143
3144            Timer::new(Duration::from_micros(100)).await;
3145        });
3146    }
3147
3148    /// As far as impl From<libc::timeval> for Duration is not allowed.
3149    fn from_timeval(v: libc::timeval) -> Duration {
3150        Duration::from_secs(v.tv_sec as u64) + Duration::from_micros(v.tv_usec as u64)
3151    }
3152
3153    fn getrusage() -> Duration {
3154        let mut s0 = MaybeUninit::<libc::rusage>::uninit();
3155        let err = unsafe { libc::getrusage(libc::RUSAGE_THREAD, s0.as_mut_ptr()) };
3156        if err != 0 {
3157            panic!("getrusage error = {}", err);
3158        }
3159        let usage = unsafe { s0.assume_init() };
3160        from_timeval(usage.ru_utime) + from_timeval(usage.ru_stime)
3161    }
3162
3163    #[test]
3164    fn test_no_spin() {
3165        let ex = LocalExecutor::default();
3166        let task_queue = ex.create_task_queue(
3167            Shares::default(),
3168            Latency::Matters(Duration::from_millis(10)),
3169            "my_tq",
3170        );
3171        let start = getrusage();
3172        ex.run(async {
3173            crate::spawn_local_into(
3174                async { timer::sleep(Duration::from_secs(1)).await },
3175                task_queue,
3176            )
3177            .expect("failed to spawn task")
3178            .await;
3179        });
3180
3181        assert!(
3182            getrusage() - start < Duration::from_millis(10),
3183            "expected user time on LE is less than 10 millisecond"
3184        );
3185    }
3186
3187    #[test]
3188    fn test_spin() {
3189        let dur = Duration::from_secs(1);
3190        let ex0 = LocalExecutorBuilder::default().make().unwrap();
3191        ex0.run(async {
3192            let ex0_ru_start = getrusage();
3193            timer::sleep(dur).await;
3194            let ex0_ru_finish = getrusage();
3195
3196            assert!(
3197                ex0_ru_finish - ex0_ru_start < Duration::from_millis(10),
3198                "expected user time on LE0 is less than 10 millisecond"
3199            );
3200        });
3201
3202        let ex = LocalExecutorBuilder::new(Placement::Fixed(0))
3203            .spin_before_park(Duration::from_millis(100))
3204            .make()
3205            .unwrap();
3206
3207        ex.run(async {
3208            let ex_ru_start = getrusage();
3209            timer::sleep(dur).await;
3210            let ex_ru_finish = getrusage();
3211
3212            // 100 ms may have passed without us running for 100ms in case
3213            // there are other threads. Need to be a bit more relaxed
3214            assert!(
3215                ex_ru_finish - ex_ru_start >= Duration::from_millis(50),
3216                "expected user time on LE is much greater than 50 millisecond",
3217            );
3218        });
3219    }
3220
3221    #[test]
3222    fn test_runtime_stats() {
3223        let dur = Duration::from_secs(2);
3224        let ex0 = LocalExecutorBuilder::default().make().unwrap();
3225        ex0.run(async {
3226            assert!(
3227                crate::executor().executor_stats().total_runtime() < Duration::from_nanos(10),
3228                "expected runtime on LE {:#?} is less than 10 ns",
3229                crate::executor().executor_stats().total_runtime()
3230            );
3231
3232            let now = Instant::now();
3233            while now.elapsed().as_millis() < 200 {}
3234            crate::executor().yield_task_queue_now().await;
3235            assert!(
3236                crate::executor().executor_stats().total_runtime() >= Duration::from_millis(200),
3237                "expected runtime on LE0 {:#?} is greater than 200 ms",
3238                crate::executor().executor_stats().total_runtime()
3239            );
3240
3241            timer::sleep(dur).await;
3242            assert!(
3243                crate::executor().executor_stats().total_runtime() < Duration::from_millis(400),
3244                "expected runtime on LE0 {:#?} is not greater than 400 ms",
3245                crate::executor().executor_stats().total_runtime()
3246            );
3247        });
3248
3249        let ex = LocalExecutorBuilder::new(Placement::Fixed(0))
3250            // ensure entire sleep should spin
3251            .spin_before_park(Duration::from_secs(5))
3252            .make()
3253            .unwrap();
3254        ex.run(async {
3255            crate::spawn_local(async move {
3256                assert!(
3257                    crate::executor().executor_stats().total_runtime() < Duration::from_nanos(10),
3258                    "expected runtime on LE {:#?} is less than 10 ns",
3259                    crate::executor().executor_stats().total_runtime()
3260                );
3261
3262                let now = Instant::now();
3263                while now.elapsed().as_millis() < 200 {}
3264                crate::executor().yield_task_queue_now().await;
3265                assert!(
3266                    crate::executor().executor_stats().total_runtime()
3267                        >= Duration::from_millis(200),
3268                    "expected runtime on LE {:#?} is greater than 200 ms",
3269                    crate::executor().executor_stats().total_runtime()
3270                );
3271                timer::sleep(dur).await;
3272                assert!(
3273                    crate::executor().executor_stats().total_runtime() < Duration::from_millis(400),
3274                    "expected runtime on LE {:#?} is not greater than 400 ms",
3275                    crate::executor().executor_stats().total_runtime()
3276                );
3277            })
3278            .await;
3279        });
3280    }
3281
3282    // Spin for 2ms and then yield. How many shares we have should control how many
3283    // quantas we manage to execute.
3284    async fn work_quanta() {
3285        let now = Instant::now();
3286        while now.elapsed().as_millis() < 2 {}
3287        crate::executor().yield_task_queue_now().await;
3288    }
3289
3290    macro_rules! test_static_shares {
3291        ( $s1:expr, $s2:expr, $work:block ) => {
3292            let local_ex = LocalExecutor::default();
3293
3294            local_ex.run(async {
3295                // Run a latency queue, otherwise a queue will run for too long uninterrupted
3296                // and we'd have to run this test for a very long time for things to equalize.
3297                let tq1 = crate::executor().create_task_queue(
3298                    Shares::Static($s1),
3299                    Latency::Matters(Duration::from_millis(1)),
3300                    "test_1",
3301                );
3302                let tq2 = crate::executor().create_task_queue(
3303                    Shares::Static($s2),
3304                    Latency::Matters(Duration::from_millis(1)),
3305                    "test_2",
3306                );
3307
3308                let tq1_count = Rc::new(Cell::new(0));
3309                let tq2_count = Rc::new(Cell::new(0));
3310                let now = Instant::now();
3311
3312                let t1 = crate::spawn_local_into(
3313                    enclose! { (tq1_count, now) async move {
3314                        while now.elapsed().as_secs() < 5 {
3315                            $work;
3316                            tq1_count.replace(tq1_count.get() + 1);
3317                        }
3318                    }},
3319                    tq1,
3320                )
3321                .unwrap();
3322
3323                let t2 = crate::spawn_local_into(
3324                    enclose! { (tq2_count, now ) async move {
3325                        while now.elapsed().as_secs() < 5 {
3326                            $work;
3327                            tq2_count.replace(tq2_count.get() + 1);
3328                        }
3329                    }},
3330                    tq2,
3331                )
3332                .unwrap();
3333
3334                join!(t1, t2);
3335
3336                let expected_ratio = $s2 as f64 / (($s2 + $s1) as f64);
3337                let actual_ratio =
3338                    tq2_count.get() as f64 / ((tq1_count.get() + tq2_count.get()) as f64);
3339
3340                // Be gentle: we don't know if we're running against other threads, under which
3341                // conditions, etc
3342                assert!((expected_ratio - actual_ratio).abs() < 0.1);
3343            });
3344        };
3345    }
3346
3347    #[test]
3348    fn test_shares_high_disparity_fat_task() {
3349        test_static_shares!(1000, 10, { work_quanta().await });
3350    }
3351
3352    #[test]
3353    fn test_shares_low_disparity_fat_task() {
3354        test_static_shares!(1000, 1000, { work_quanta().await });
3355    }
3356
3357    #[test]
3358    fn test_allocate_dma_buffer() {
3359        LocalExecutor::default().run(async {
3360            let mut buffer = crate::allocate_dma_buffer(42);
3361            assert_eq!(buffer.len(), 42);
3362            buffer.as_bytes_mut()[0] = 12;
3363            buffer.as_bytes_mut()[12] = 13;
3364            assert_eq!(buffer.as_bytes_mut().len(), 42);
3365            assert_eq!(buffer.as_bytes()[0], 12);
3366            assert_eq!(buffer.as_bytes()[12], 13);
3367        });
3368    }
3369
3370    #[test]
3371    fn test_allocate_dma_buffer_global() {
3372        LocalExecutor::default().run(async {
3373            let mut buffer = crate::allocate_dma_buffer_global(42);
3374            assert_eq!(buffer.len(), 42);
3375            assert_eq!(buffer.len(), 42);
3376            buffer.as_bytes_mut()[0] = 12;
3377            buffer.as_bytes_mut()[12] = 13;
3378            assert_eq!(buffer.as_bytes_mut().len(), 42);
3379            assert_eq!(buffer.as_bytes()[0], 12);
3380            assert_eq!(buffer.as_bytes()[12], 13);
3381        });
3382    }
3383
3384    struct DynamicSharesTest {
3385        shares: Cell<usize>,
3386    }
3387
3388    impl DynamicSharesTest {
3389        fn new() -> Rc<Self> {
3390            Rc::new(Self {
3391                shares: Cell::new(0),
3392            })
3393        }
3394        fn tick(&self, millis: u64) {
3395            if millis < 1000 {
3396                self.shares.replace(1);
3397            } else {
3398                self.shares.replace(1000);
3399            }
3400        }
3401    }
3402
3403    impl SharesManager for DynamicSharesTest {
3404        fn shares(&self) -> usize {
3405            self.shares.get()
3406        }
3407
3408        fn adjustment_period(&self) -> Duration {
3409            Duration::from_millis(1)
3410        }
3411    }
3412
3413    #[test]
3414    fn test_dynamic_shares() {
3415        let local_ex = LocalExecutor::default();
3416
3417        local_ex.run(async {
3418            let bm = DynamicSharesTest::new();
3419            // Reference task queue.
3420            let tq1 = crate::executor().create_task_queue(
3421                Shares::Static(1000),
3422                Latency::Matters(Duration::from_millis(1)),
3423                "test_1",
3424            );
3425            let tq2 = crate::executor().create_task_queue(
3426                Shares::Dynamic(bm.clone()),
3427                Latency::Matters(Duration::from_millis(1)),
3428                "test_2",
3429            );
3430
3431            let tq1_count = Rc::new(RefCell::new(vec![0, 0]));
3432            let tq2_count = Rc::new(RefCell::new(vec![0, 0]));
3433            let now = Instant::now();
3434
3435            let t1 = crate::spawn_local_into(
3436                enclose! { (tq1_count, now) async move {
3437                    loop {
3438                        let secs = now.elapsed().as_secs();
3439                        if secs >= 2 {
3440                            break;
3441                        }
3442                        (*tq1_count.borrow_mut())[secs as usize] += 1;
3443                        crate::executor().yield_task_queue_now().await;
3444                    }
3445                }},
3446                tq1,
3447            )
3448            .unwrap();
3449
3450            let t2 = crate::spawn_local_into(
3451                enclose! { (tq2_count, now, bm) async move {
3452                    loop {
3453                        let elapsed = now.elapsed();
3454                        let secs = elapsed.as_secs();
3455                        if secs >= 2 {
3456                            break;
3457                        }
3458                        bm.tick(elapsed.as_millis() as u64);
3459                        (*tq2_count.borrow_mut())[secs as usize] += 1;
3460                        crate::executor().yield_task_queue_now().await;
3461                    }
3462                }},
3463                tq2,
3464            )
3465            .unwrap();
3466
3467            join!(t1, t2);
3468            // Keep this very simple because every new processor, every load condition, will
3469            // yield different results. All we want to validate is: for a large
3470            // part of the first two seconds shares were very low, we should
3471            // have received very low ratio. On the second half we should have
3472            // accumulated much more. Real numbers are likely much higher than
3473            // the targets, but those targets are safe.
3474            let ratios: Vec<f64> = tq1_count
3475                .borrow()
3476                .iter()
3477                .zip(tq2_count.borrow().iter())
3478                .map(|(x, y)| *y as f64 / *x as f64)
3479                .collect();
3480            assert!(ratios[1] > ratios[0]);
3481            assert!(ratios[0] < 0.25);
3482            assert!(ratios[1] > 0.50);
3483        });
3484    }
3485
3486    #[test]
3487    fn multiple_spawn() {
3488        // Issue 241
3489        LocalExecutor::default().run(async {
3490            crate::spawn_local(async {}).detach().await;
3491            // In issue 241, the presence of the second detached waiter caused
3492            // the program to hang.
3493            crate::spawn_local(async {}).detach().await;
3494        });
3495    }
3496
3497    #[test]
3498    #[should_panic(expected = "Message!")]
3499    fn panic_is_not_list() {
3500        LocalExecutor::default().run(async { panic!("Message!") });
3501    }
3502
3503    struct TestFuture {
3504        w: Arc<Mutex<Option<Waker>>>,
3505    }
3506
3507    impl Future for TestFuture {
3508        type Output = ();
3509        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3510            let mut w = self.w.lock().unwrap();
3511            match w.take() {
3512                Some(_) => Poll::Ready(()),
3513                None => {
3514                    *w = Some(cx.waker().clone());
3515                    Poll::Pending
3516                }
3517            }
3518        }
3519    }
3520
3521    #[test]
3522    fn cross_executor_wake_by_ref() {
3523        let w = Arc::new(Mutex::new(None));
3524        let t = w.clone();
3525
3526        let fut = TestFuture { w };
3527
3528        let ex1 = LocalExecutorBuilder::default()
3529            .spawn(|| async move {
3530                fut.await;
3531            })
3532            .unwrap();
3533
3534        let ex2 = LocalExecutorBuilder::default()
3535            .spawn(|| async move {
3536                loop {
3537                    sleep(Duration::from_secs(1)).await;
3538                    let w = t.lock().unwrap();
3539                    if let Some(ref x) = *w {
3540                        x.wake_by_ref();
3541                        return;
3542                    }
3543                }
3544            })
3545            .unwrap();
3546
3547        ex1.join().unwrap();
3548        ex2.join().unwrap();
3549    }
3550
3551    #[test]
3552    fn cross_executor_wake_by_value() {
3553        let w = Arc::new(Mutex::new(None));
3554        let t = w.clone();
3555
3556        let fut = TestFuture { w };
3557
3558        let ex1 = LocalExecutorBuilder::default()
3559            .spawn(|| async move {
3560                fut.await;
3561            })
3562            .unwrap();
3563
3564        let ex2 = LocalExecutorBuilder::default()
3565            .spawn(|| async move {
3566                loop {
3567                    sleep(Duration::from_secs(1)).await;
3568                    let w = t.lock().unwrap();
3569                    if let Some(x) = w.clone() {
3570                        x.wake();
3571                        return;
3572                    }
3573                }
3574            })
3575            .unwrap();
3576
3577        ex1.join().unwrap();
3578        ex2.join().unwrap();
3579    }
3580
3581    // Wakes up the waker in a remote executor
3582    #[test]
3583    fn cross_executor_wake_with_join_handle() {
3584        let w = Arc::new(Mutex::new(None));
3585        let t = w.clone();
3586
3587        let fut = TestFuture { w };
3588
3589        let ex1 = LocalExecutorBuilder::default()
3590            .spawn(|| async move {
3591                let x = crate::spawn_local(fut).detach();
3592                x.await;
3593            })
3594            .unwrap();
3595
3596        let ex2 = LocalExecutorBuilder::default()
3597            .spawn(|| async move {
3598                loop {
3599                    sleep(Duration::from_secs(1)).await;
3600                    let w = t.lock().unwrap();
3601                    if let Some(x) = w.clone() {
3602                        x.wake();
3603                        return;
3604                    }
3605                }
3606            })
3607            .unwrap();
3608
3609        ex1.join().unwrap();
3610        ex2.join().unwrap();
3611    }
3612
3613    // The other side won't be alive to get the notification. We should still
3614    // survive.
3615    #[test]
3616    fn cross_executor_wake_early_drop() {
3617        let w = Arc::new(Mutex::new(None));
3618        let t = w.clone();
3619
3620        let fut = TestFuture { w };
3621
3622        let ex1 = LocalExecutorBuilder::default()
3623            .spawn(|| async move {
3624                let _drop = futures_lite::future::poll_once(fut).await;
3625            })
3626            .unwrap();
3627
3628        let ex2 = LocalExecutorBuilder::default()
3629            .spawn(|| async move {
3630                loop {
3631                    sleep(Duration::from_secs(1)).await;
3632                    let w = t.lock().unwrap();
3633                    if let Some(ref x) = *w {
3634                        x.wake_by_ref();
3635                        return;
3636                    }
3637                }
3638            })
3639            .unwrap();
3640
3641        ex1.join().unwrap();
3642        ex2.join().unwrap();
3643    }
3644
3645    // The other side won't be alive to get the notification and even worse, we hold
3646    // a waker that we notify after the first executor is surely dead. We should
3647    // still survive.
3648    #[test]
3649    fn cross_executor_wake_hold_waker() {
3650        let w = Arc::new(Mutex::new(None));
3651        let t = w.clone();
3652
3653        let fut = TestFuture { w };
3654
3655        let ex1 = LocalExecutorBuilder::default()
3656            .spawn(|| async move {
3657                let _drop = futures_lite::future::poll_once(fut).await;
3658            })
3659            .unwrap();
3660        ex1.join().unwrap();
3661
3662        let ex2 = LocalExecutorBuilder::default()
3663            .spawn(|| async move {
3664                let w = t.lock().unwrap().clone().unwrap();
3665                w.wake_by_ref();
3666            })
3667            .unwrap();
3668
3669        ex2.join().unwrap();
3670    }
3671
3672    #[test]
3673    fn executor_pool_builder() {
3674        let nr_cpus = 4;
3675
3676        let count = Arc::new(AtomicUsize::new(0));
3677        let handles = LocalExecutorPoolBuilder::new(PoolPlacement::Unbound(nr_cpus))
3678            .on_all_shards({
3679                let count = Arc::clone(&count);
3680                || async move { count.fetch_add(1, Ordering::Relaxed) }
3681            })
3682            .unwrap();
3683
3684        let _: std::thread::ThreadId = handles.handles[0].thread().id();
3685
3686        assert_eq!(nr_cpus, handles.handles().iter().count());
3687
3688        let mut fut_output = handles
3689            .join_all()
3690            .into_iter()
3691            .map(Result::unwrap)
3692            .collect::<Vec<_>>();
3693
3694        fut_output.sort_unstable();
3695
3696        assert_eq!(fut_output, (0..nr_cpus).collect::<Vec<_>>());
3697
3698        assert_eq!(nr_cpus, count.load(Ordering::Relaxed));
3699    }
3700
3701    #[test]
3702    fn executor_invalid_executor_count() {
3703        assert!(LocalExecutorPoolBuilder::new(PoolPlacement::Unbound(0))
3704            .on_all_shards(|| async move {})
3705            .is_err());
3706    }
3707
3708    #[test]
3709    fn executor_pool_builder_placements() {
3710        let cpu_set = CpuSet::online().unwrap();
3711        assert!(!cpu_set.is_empty());
3712
3713        for nn in 1..2 {
3714            let nr_execs = nn * cpu_set.len();
3715            let mut placements = vec![
3716                PoolPlacement::Unbound(nr_execs),
3717                PoolPlacement::Fenced(nr_execs, cpu_set.clone()),
3718                PoolPlacement::MaxSpread(nr_execs, None),
3719                PoolPlacement::MaxSpread(nr_execs, Some(cpu_set.clone())),
3720                PoolPlacement::MaxPack(nr_execs, None),
3721                PoolPlacement::MaxPack(nr_execs, Some(cpu_set.clone())),
3722            ];
3723
3724            for pp in placements.drain(..) {
3725                let ids = Arc::new(Mutex::new(HashMap::new()));
3726                let cpus = Arc::new(Mutex::new(HashMap::new()));
3727                let cpu_hard_bind =
3728                    !matches!(pp, PoolPlacement::Unbound(_) | PoolPlacement::Fenced(_, _));
3729
3730                let handles = LocalExecutorPoolBuilder::new(pp)
3731                    .on_all_shards({
3732                        let ids = Arc::clone(&ids);
3733                        let cpus = Arc::clone(&cpus);
3734                        || async move {
3735                            ids.lock()
3736                                .unwrap()
3737                                .entry(crate::executor().id())
3738                                .and_modify(|e| *e += 1)
3739                                .or_insert(1);
3740
3741                            let pid = nix::unistd::Pid::from_raw(0);
3742                            let cpu = nix::sched::sched_getaffinity(pid).unwrap();
3743                            cpus.lock()
3744                                .unwrap()
3745                                .entry(cpu)
3746                                .and_modify(|e| *e += 1)
3747                                .or_insert(1);
3748                        }
3749                    })
3750                    .unwrap();
3751
3752                assert_eq!(nr_execs, handles.handles().len());
3753                handles
3754                    .join_all()
3755                    .into_iter()
3756                    .for_each(|r| assert!(r.is_ok()));
3757
3758                assert_eq!(nr_execs, ids.lock().unwrap().len());
3759                ids.lock().unwrap().values().for_each(|v| assert_eq!(*v, 1));
3760
3761                if cpu_hard_bind {
3762                    assert_eq!(nr_execs, cpus.lock().unwrap().len());
3763                    cpus.lock()
3764                        .unwrap()
3765                        .values()
3766                        .for_each(|v| assert_eq!(*v, nn));
3767                }
3768            }
3769        }
3770    }
3771
3772    #[test]
3773    fn executor_pool_builder_shards_limit() {
3774        let cpu_set = CpuSet::online().unwrap();
3775        assert!(!cpu_set.is_empty());
3776
3777        // test: confirm that we can always get shards up to the # of cpus
3778        {
3779            let mut placements = vec![
3780                (false, PoolPlacement::Unbound(cpu_set.len())),
3781                (false, PoolPlacement::Fenced(cpu_set.len(), cpu_set.clone())),
3782                (true, PoolPlacement::MaxSpread(cpu_set.len(), None)),
3783                (
3784                    true,
3785                    PoolPlacement::MaxSpread(cpu_set.len(), Some(cpu_set.clone())),
3786                ),
3787                (true, PoolPlacement::MaxPack(cpu_set.len(), None)),
3788                (
3789                    true,
3790                    PoolPlacement::MaxPack(cpu_set.len(), Some(cpu_set.clone())),
3791                ),
3792            ];
3793
3794            for (_shard_limited, p) in placements.drain(..) {
3795                LocalExecutorPoolBuilder::new(p)
3796                    .on_all_shards(|| async move {})
3797                    .unwrap()
3798                    .join_all();
3799            }
3800        }
3801
3802        // test: confirm that some placements fail when shards are # of cpus + 1
3803        {
3804            let mut placements = vec![
3805                (false, PoolPlacement::Unbound(1 + cpu_set.len())),
3806                (
3807                    false,
3808                    PoolPlacement::Fenced(1 + cpu_set.len(), cpu_set.clone()),
3809                ),
3810                (true, PoolPlacement::MaxSpread(1 + cpu_set.len(), None)),
3811                (
3812                    true,
3813                    PoolPlacement::MaxSpread(1 + cpu_set.len(), Some(cpu_set.clone())),
3814                ),
3815                (true, PoolPlacement::MaxPack(1 + cpu_set.len(), None)),
3816                (
3817                    true,
3818                    PoolPlacement::MaxPack(1 + cpu_set.len(), Some(cpu_set)),
3819                ),
3820            ];
3821
3822            for (shard_limited, p) in placements.drain(..) {
3823                match LocalExecutorPoolBuilder::new(p).on_all_shards(|| async move {}) {
3824                    Ok(handles) => {
3825                        handles.join_all();
3826                        assert!(!shard_limited);
3827                    }
3828                    Err(_) => assert!(shard_limited),
3829                }
3830            }
3831        }
3832    }
3833
3834    #[test]
3835    fn scoped_task() {
3836        LocalExecutor::default().run(async {
3837            let mut a = 1;
3838            unsafe {
3839                crate::spawn_scoped_local(async {
3840                    a = 2;
3841                })
3842                .await;
3843            }
3844            crate::executor().yield_task_queue_now().await;
3845            assert_eq!(a, 2);
3846
3847            let mut a = 1;
3848            let do_later = unsafe {
3849                crate::spawn_scoped_local(async {
3850                    a = 2;
3851                })
3852            };
3853
3854            crate::executor().yield_task_queue_now().await;
3855            do_later.await;
3856            assert_eq!(a, 2);
3857        });
3858    }
3859
3860    #[test]
3861    fn executor_pool_builder_thread_panic() {
3862        let nr_execs = 8;
3863        let res = LocalExecutorPoolBuilder::new(PoolPlacement::Unbound(nr_execs))
3864            .on_all_shards(|| async move { panic!("join handle will be Err") })
3865            .unwrap()
3866            .join_all();
3867
3868        assert_eq!(nr_execs, res.len());
3869        assert!(res.into_iter().all(|r| r.is_err()));
3870    }
3871
3872    #[test]
3873    fn executor_pool_builder_return_values() {
3874        let nr_execs = 8;
3875        let x = Arc::new(AtomicUsize::new(0));
3876        let mut values = LocalExecutorPoolBuilder::new(PoolPlacement::Unbound(nr_execs))
3877            .on_all_shards(|| async move { x.fetch_add(1, Ordering::Relaxed) })
3878            .unwrap()
3879            .join_all()
3880            .into_iter()
3881            .map(Result::unwrap)
3882            .collect::<Vec<_>>();
3883
3884        values.sort_unstable();
3885        assert_eq!(values, (0..nr_execs).collect::<Vec<_>>());
3886    }
3887
3888    #[test]
3889    fn executor_pool_builder_spawn_cancel() {
3890        let nr_shards = 8;
3891        let builder = LocalExecutorPoolBuilder::new(PoolPlacement::Unbound(nr_shards));
3892        let nr_exectuted = Arc::new(AtomicUsize::new(0));
3893
3894        let fut_gen = {
3895            let nr_exectuted = Arc::clone(&nr_exectuted);
3896            || async move {
3897                nr_exectuted.fetch_add(1, Ordering::Relaxed);
3898                unreachable!("should not execute")
3899            }
3900        };
3901
3902        let mut handles = PoolThreadHandles::new();
3903        let mut cpu_set_gen = placement::CpuSetGenerator::pool(builder.placement.clone()).unwrap();
3904        let latch = Latch::new(builder.placement.executor_count());
3905
3906        let ii_cxl = 2;
3907        for ii in 0..builder.placement.executor_count() {
3908            if ii == nr_shards - ii_cxl {
3909                std::thread::sleep(std::time::Duration::from_millis(100));
3910                assert!(ii_cxl <= latch.cancel().unwrap());
3911            }
3912            match builder.spawn_thread(&mut cpu_set_gen, &latch, fut_gen.clone()) {
3913                Ok(handle) => handles.push(handle),
3914                Err(_) => break,
3915            }
3916        }
3917
3918        assert_eq!(0, nr_exectuted.load(Ordering::Relaxed));
3919        assert_eq!(nr_shards, handles.handles.len());
3920        handles.join_all().into_iter().for_each(|s| {
3921            assert!(format!("{}", s.unwrap_err()).contains("spawn failed"));
3922        });
3923    }
3924
3925    #[should_panic]
3926    #[test]
3927    fn executor_inception() {
3928        LocalExecutor::default().run(async {
3929            LocalExecutor::default().run(async {});
3930        });
3931    }
3932
3933    enum TaskState {
3934        Pending(Option<Waker>),
3935        Ready,
3936    }
3937
3938    // following four tests are regression ones for https://github.com/DataDog/glommio/issues/379.
3939    // here we test against task reference count underflow
3940    // test includes two scenarios, with join handles and with sleep, for each case
3941    // we test both, wake and wake_by_ref
3942    #[test]
3943    fn wake_by_ref_refcount_underflow_with_join_handle() {
3944        LocalExecutor::default().run(async {
3945            let slot: Rc<RefCell<TaskState>> = Rc::new(RefCell::new(TaskState::Pending(None)));
3946            let cloned_slot = slot.clone();
3947            let jh = crate::spawn_local(async move {
3948                // first task, places waker of self into slot, when polled checks for result, if
3949                // it's ready, returns Ready, otherwise return Pending
3950                poll_fn::<(), _>(|cx| {
3951                    let current = &mut *cloned_slot.borrow_mut();
3952                    match current {
3953                        TaskState::Pending(maybe_waker) => match maybe_waker {
3954                            Some(_) => unreachable!(),
3955                            None => {
3956                                *current = TaskState::Pending(Some(cx.waker().clone()));
3957                                Poll::Pending
3958                            }
3959                        },
3960                        TaskState::Ready => Poll::Ready(()),
3961                    }
3962                })
3963                .await;
3964            })
3965            .detach();
3966            let jh2 = crate::spawn_local(async move {
3967                // second task, checks slot for first task waker, wakes it by ref, and then it
3968                // is dropped.
3969                let current = &mut *slot.borrow_mut();
3970                match current {
3971                    TaskState::Pending(maybe_waker) => {
3972                        let waker = maybe_waker.take().unwrap();
3973                        waker.wake_by_ref();
3974                        *current = TaskState::Ready; // <-- waker dropped here, refcount is zero
3975                    }
3976                    TaskState::Ready => unreachable!(), // task cannot be ready at this time
3977                }
3978            })
3979            .detach();
3980            join_all(vec![jh, jh2]).await;
3981        });
3982    }
3983
3984    #[test]
3985    fn wake_by_ref_refcount_underflow_with_sleep() {
3986        LocalExecutor::default().run(async {
3987            let slot: Rc<RefCell<TaskState>> = Rc::new(RefCell::new(TaskState::Pending(None)));
3988            let cloned_slot = slot.clone();
3989            crate::spawn_local(async move {
3990                poll_fn::<(), _>(|cx| {
3991                    let current = &mut *cloned_slot.borrow_mut();
3992                    match current {
3993                        TaskState::Pending(maybe_waker) => match maybe_waker {
3994                            Some(_) => unreachable!(),
3995                            None => {
3996                                *current = TaskState::Pending(Some(cx.waker().clone()));
3997                                Poll::Pending
3998                            }
3999                        },
4000                        TaskState::Ready => Poll::Ready(()),
4001                    }
4002                })
4003                .await;
4004            })
4005            .detach();
4006            crate::spawn_local(async move {
4007                let current = &mut *slot.borrow_mut();
4008                match current {
4009                    TaskState::Pending(maybe_waker) => {
4010                        let waker = maybe_waker.take().unwrap();
4011                        waker.wake_by_ref();
4012                        *current = TaskState::Ready;
4013                    }
4014                    TaskState::Ready => unreachable!(),
4015                }
4016            })
4017            .detach();
4018            timer::sleep(Duration::from_millis(1)).await;
4019        });
4020    }
4021
4022    #[test]
4023    fn wake_refcount_underflow_with_join_handle() {
4024        LocalExecutor::default().run(async {
4025            let slot: Rc<RefCell<TaskState>> = Rc::new(RefCell::new(TaskState::Pending(None)));
4026            let cloned_slot = slot.clone();
4027            let jh = crate::spawn_local(async move {
4028                poll_fn::<(), _>(|cx| {
4029                    let current = &mut *cloned_slot.borrow_mut();
4030                    match current {
4031                        TaskState::Pending(maybe_waker) => match maybe_waker {
4032                            Some(_) => unreachable!(),
4033                            None => {
4034                                *current = TaskState::Pending(Some(cx.waker().clone()));
4035                                Poll::Pending
4036                            }
4037                        },
4038                        TaskState::Ready => Poll::Ready(()),
4039                    }
4040                })
4041                .await;
4042            })
4043            .detach();
4044            let jh2 = crate::spawn_local(async move {
4045                let current = &mut *slot.borrow_mut();
4046                match current {
4047                    TaskState::Pending(maybe_waker) => {
4048                        let waker = maybe_waker.take().unwrap();
4049                        waker.wake();
4050                        *current = TaskState::Ready;
4051                    }
4052                    TaskState::Ready => unreachable!(),
4053                }
4054            })
4055            .detach();
4056            join_all(vec![jh, jh2]).await;
4057        });
4058    }
4059
4060    #[test]
4061    fn wake_refcount_underflow_with_sleep() {
4062        LocalExecutor::default().run(async {
4063            let slot: Rc<RefCell<TaskState>> = Rc::new(RefCell::new(TaskState::Pending(None)));
4064            let cloned_slot = slot.clone();
4065            crate::spawn_local(async move {
4066                poll_fn::<(), _>(|cx| {
4067                    let current = &mut *cloned_slot.borrow_mut();
4068                    match current {
4069                        TaskState::Pending(maybe_waker) => match maybe_waker {
4070                            Some(_) => unreachable!(),
4071                            None => {
4072                                *current = TaskState::Pending(Some(cx.waker().clone()));
4073                                Poll::Pending
4074                            }
4075                        },
4076                        TaskState::Ready => Poll::Ready(()),
4077                    }
4078                })
4079                .await;
4080            })
4081            .detach();
4082            crate::spawn_local(async move {
4083                let current = &mut *slot.borrow_mut();
4084                match current {
4085                    TaskState::Pending(maybe_waker) => {
4086                        let waker = maybe_waker.take().unwrap();
4087                        waker.wake();
4088                        *current = TaskState::Ready;
4089                    }
4090                    TaskState::Ready => unreachable!(),
4091                }
4092            })
4093            .detach();
4094            timer::sleep(Duration::from_millis(1)).await;
4095        });
4096    }
4097
4098    #[test]
4099    fn blocking_function() {
4100        LocalExecutor::default().run(async {
4101            let started = Instant::now();
4102
4103            let blocking = executor().spawn_blocking(enclose!((started) move || {
4104                let now = Instant::now();
4105                while now.elapsed() < Duration::from_millis(100) {}
4106                started.elapsed()
4107            }));
4108            let coop = enclose!((started) async move {
4109                let now = Instant::now();
4110                while now.elapsed() < Duration::from_millis(100) {
4111                    yield_if_needed().await;
4112                }
4113                started.elapsed()
4114            });
4115
4116            let (blocking, coop) = join(blocking, coop).await;
4117
4118            assert!(blocking.as_millis() >= 100 && blocking.as_millis() < 150);
4119            assert!(coop.as_millis() >= 100 && coop.as_millis() < 150);
4120        });
4121    }
4122
4123    #[test]
4124    fn blocking_function_parallelism() {
4125        LocalExecutorBuilder::new(Placement::Unbound)
4126            .blocking_thread_pool_placement(PoolPlacement::Unbound(4))
4127            .spawn(|| async {
4128                let started = Instant::now();
4129                let mut blocking = vec![];
4130
4131                for _ in 0..5 {
4132                    blocking.push(executor().spawn_blocking(enclose!((started) move || {
4133                        let now = Instant::now();
4134                        while now.elapsed() < Duration::from_millis(100) {}
4135                        started.elapsed()
4136                    })));
4137                }
4138
4139                // we created 5 blocking jobs each taking 100ms but our thread pool only has 4
4140                // threads. We expect one of those jobs to take twice as long as the others.
4141
4142                let mut ts = join_all(blocking.into_iter()).await;
4143                assert_eq!(ts.len(), 5);
4144
4145                ts.sort_unstable();
4146                for ts in ts.iter().take(4) {
4147                    assert!(ts.as_millis() >= 100 && ts.as_millis() < 150);
4148                }
4149                assert!(ts[4].as_millis() >= 200 && ts[4].as_millis() < 250);
4150            })
4151            .unwrap()
4152            .join()
4153            .unwrap();
4154    }
4155
4156    #[test]
4157    fn blocking_function_placement_independent_of_executor_placement() {
4158        let affinity = nix::sched::sched_getaffinity(nix::unistd::Pid::from_raw(0)).unwrap();
4159        let num_cpus_accessible_by_default = (0..nix::sched::CpuSet::count())
4160            .map(|cpu| affinity.is_set(cpu).unwrap() as usize)
4161            .sum::<usize>();
4162        if num_cpus_accessible_by_default < 2 {
4163            eprintln!(
4164                "Insufficient CPUs available to test blocking_function_placement_independent_of_executor_placement (affinity only allows for {})",
4165                num_cpus_accessible_by_default,
4166            );
4167            return;
4168        }
4169
4170        let num_schedulable_cpus = LocalExecutorBuilder::new(Placement::Fixed(0))
4171            .blocking_thread_pool_placement(PoolPlacement::Unbound(2))
4172            .spawn(|| async {
4173                executor()
4174                    .spawn_blocking(move || {
4175                        let pid = nix::unistd::Pid::from_raw(0);
4176                        let affinity =
4177                            nix::sched::sched_getaffinity(pid).expect("Failed to get affinity");
4178                        (0..nix::sched::CpuSet::count())
4179                            .map(|cpu| {
4180                                affinity
4181                                    .is_set(cpu)
4182                                    .expect("Failed to check if cpu affinity is set")
4183                                    as usize
4184                            })
4185                            .sum::<usize>()
4186                    })
4187                    .await
4188            })
4189            .unwrap()
4190            .join()
4191            .unwrap();
4192
4193        assert!(
4194            num_schedulable_cpus >= num_cpus_accessible_by_default,
4195            "num schedulable {}, num cpus accessible {}",
4196            num_schedulable_cpus,
4197            num_cpus_accessible_by_default,
4198        );
4199    }
4200
4201    #[test]
4202    fn blocking_pool_invalid_placement() {
4203        let ret = LocalExecutorBuilder::new(Placement::Unbound)
4204            .blocking_thread_pool_placement(PoolPlacement::Unbound(0))
4205            .spawn(|| async {})
4206            .unwrap()
4207            .join();
4208        assert!(ret.is_err());
4209    }
4210
4211    #[test]
4212    fn local_executor_unset() {
4213        LocalExecutor::default().run(async {});
4214
4215        #[cfg(not(feature = "native-tls"))]
4216        assert!(!LOCAL_EX.is_set());
4217
4218        #[cfg(feature = "native-tls")]
4219        assert!(unsafe { LOCAL_EX.is_null() });
4220    }
4221
4222    #[test]
4223    fn local_executor_unset_when_panic() {
4224        let res = std::panic::catch_unwind(|| {
4225            LocalExecutor::default().run(async {
4226                panic!("uh oh!");
4227            });
4228        });
4229        assert!(res.is_err());
4230
4231        #[cfg(not(feature = "native-tls"))]
4232        assert!(!LOCAL_EX.is_set());
4233
4234        #[cfg(feature = "native-tls")]
4235        assert!(unsafe { LOCAL_EX.is_null() });
4236    }
4237}