tokio_current_thread/
lib.rs

1#![doc(html_root_url = "https://docs.rs/tokio-current-thread/0.1.7")]
2#![deny(missing_docs, missing_debug_implementations)]
3
4//! A single-threaded executor which executes tasks on the same thread from which
5//! they are spawned.
6//!
7//! > **Note:** This crate is **deprecated in tokio 0.2.x** and has been moved
8//! > and refactored into various places in the [`tokio`] crate. The closest
9//! replacement is to make use of [`tokio::task::LocalSet::block_on`] which
10//! requires the [`rt-util` feature].
11//!
12//! [`tokio`]: https://docs.rs/tokio/latest/tokio/index.html
13//! [`tokio::task::LocalSet::block_on`]: https://docs.rs/tokio/latest/tokio/task/struct.LocalSet.html#method.block_on
14//! [`rt-util` feature]: https://docs.rs/tokio/latest/tokio/index.html#feature-flags
15//!
16//! The crate provides:
17//!
18//! * [`CurrentThread`] is the main type of this crate. It executes tasks on the current thread.
19//!   The easiest way to start a new [`CurrentThread`] executor is to call
20//!   [`block_on_all`] with an initial task to seed the executor.
21//!   All tasks that are being managed by a [`CurrentThread`] executor are able to
22//!   spawn additional tasks by calling [`spawn`].
23//!
24//!
25//! Application authors will not use this crate directly. Instead, they will use the
26//! `tokio` crate. Library authors should only depend on `tokio-current-thread` if they
27//! are building a custom task executor.
28//!
29//! For more details, see [executor module] documentation in the Tokio crate.
30//!
31//! [`CurrentThread`]: struct.CurrentThread.html
32//! [`spawn`]: fn.spawn.html
33//! [`block_on_all`]: fn.block_on_all.html
34//! [executor module]: https://docs.rs/tokio/0.1/tokio/executor/index.html
35
36extern crate futures;
37extern crate tokio_executor;
38
39mod scheduler;
40
41use self::scheduler::Scheduler;
42
43use tokio_executor::park::{Park, ParkThread, Unpark};
44use tokio_executor::{Enter, SpawnError};
45
46use futures::future::{ExecuteError, ExecuteErrorKind, Executor};
47use futures::{executor, Async, Future};
48
49use std::cell::Cell;
50use std::error::Error;
51use std::fmt;
52use std::rc::Rc;
53use std::sync::{atomic, mpsc, Arc};
54use std::thread;
55use std::time::{Duration, Instant};
56
57/// Executes tasks on the current thread
58pub struct CurrentThread<P: Park = ParkThread> {
59    /// Execute futures and receive unpark notifications.
60    scheduler: Scheduler<P::Unpark>,
61
62    /// Current number of futures being executed.
63    ///
64    /// The LSB is used to indicate that the runtime is preparing to shut down.
65    /// Thus, to get the actual number of pending futures, `>>1`.
66    num_futures: Arc<atomic::AtomicUsize>,
67
68    /// Thread park handle
69    park: P,
70
71    /// Handle for spawning new futures from other threads
72    spawn_handle: Handle,
73
74    /// Receiver for futures spawned from other threads
75    spawn_receiver: mpsc::Receiver<Box<dyn Future<Item = (), Error = ()> + Send + 'static>>,
76
77    /// The thread-local ID assigned to this executor.
78    id: u64,
79}
80
81/// Executes futures on the current thread.
82///
83/// All futures executed using this executor will be executed on the current
84/// thread. As such, `run` will wait for these futures to complete before
85/// returning.
86///
87/// For more details, see the [module level](index.html) documentation.
88#[derive(Debug, Clone)]
89pub struct TaskExecutor {
90    // Prevent the handle from moving across threads.
91    _p: ::std::marker::PhantomData<Rc<()>>,
92}
93
94/// Returned by the `turn` function.
95#[derive(Debug)]
96pub struct Turn {
97    polled: bool,
98}
99
100impl Turn {
101    /// `true` if any futures were polled at all and `false` otherwise.
102    pub fn has_polled(&self) -> bool {
103        self.polled
104    }
105}
106
107/// A `CurrentThread` instance bound to a supplied execution context.
108pub struct Entered<'a, P: Park + 'a> {
109    executor: &'a mut CurrentThread<P>,
110    enter: &'a mut Enter,
111}
112
113/// Error returned by the `run` function.
114#[derive(Debug)]
115pub struct RunError {
116    _p: (),
117}
118
119impl fmt::Display for RunError {
120    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
121        write!(fmt, "{}", self.description())
122    }
123}
124
125impl Error for RunError {
126    fn description(&self) -> &str {
127        "Run error"
128    }
129}
130
131/// Error returned by the `run_timeout` function.
132#[derive(Debug)]
133pub struct RunTimeoutError {
134    timeout: bool,
135}
136
137impl fmt::Display for RunTimeoutError {
138    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
139        write!(fmt, "{}", self.description())
140    }
141}
142
143impl Error for RunTimeoutError {
144    fn description(&self) -> &str {
145        if self.timeout {
146            "Run timeout error (timeout)"
147        } else {
148            "Run timeout error (not timeout)"
149        }
150    }
151}
152
153/// Error returned by the `turn` function.
154#[derive(Debug)]
155pub struct TurnError {
156    _p: (),
157}
158
159impl fmt::Display for TurnError {
160    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
161        write!(fmt, "{}", self.description())
162    }
163}
164
165impl Error for TurnError {
166    fn description(&self) -> &str {
167        "Turn error"
168    }
169}
170
171/// Error returned by the `block_on` function.
172#[derive(Debug)]
173pub struct BlockError<T> {
174    inner: Option<T>,
175}
176
177impl<T> fmt::Display for BlockError<T> {
178    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
179        write!(fmt, "Block error")
180    }
181}
182
183impl<T: fmt::Debug> Error for BlockError<T> {
184    fn description(&self) -> &str {
185        "Block error"
186    }
187}
188
189/// This is mostly split out to make the borrow checker happy.
190struct Borrow<'a, U: 'a> {
191    id: u64,
192    scheduler: &'a mut Scheduler<U>,
193    num_futures: &'a atomic::AtomicUsize,
194}
195
196trait SpawnLocal {
197    fn spawn_local(
198        &mut self,
199        future: Box<dyn Future<Item = (), Error = ()>>,
200        already_counted: bool,
201    );
202}
203
204struct CurrentRunner {
205    spawn: Cell<Option<*mut dyn SpawnLocal>>,
206    id: Cell<Option<u64>>,
207}
208
209thread_local! {
210    /// Current thread's task runner. This is set in `TaskRunner::with`
211    static CURRENT: CurrentRunner = CurrentRunner {
212        spawn: Cell::new(None),
213        id: Cell::new(None),
214    }
215}
216
217thread_local! {
218    /// Unique ID to assign to each new executor launched on this thread.
219    ///
220    /// The unique ID is used to determine if the currently running executor matches the one
221    /// referred to by a `Handle` so that direct task dispatch can be used.
222    static EXECUTOR_ID: Cell<u64> = Cell::new(0)
223}
224
225/// Run the executor bootstrapping the execution with the provided future.
226///
227/// This creates a new [`CurrentThread`] executor, spawns the provided future,
228/// and blocks the current thread until the provided future and **all**
229/// subsequently spawned futures complete. In other words:
230///
231/// * If the provided bootstrap future does **not** spawn any additional tasks,
232///   `block_on_all` returns once `future` completes.
233/// * If the provided bootstrap future **does** spawn additional tasks, then
234///   `block_on_all` returns once **all** spawned futures complete.
235///
236/// See [module level][mod] documentation for more details.
237///
238/// [`CurrentThread`]: struct.CurrentThread.html
239/// [mod]: index.html
240pub fn block_on_all<F>(future: F) -> Result<F::Item, F::Error>
241where
242    F: Future,
243{
244    let mut current_thread = CurrentThread::new();
245
246    let ret = current_thread.block_on(future);
247    current_thread.run().unwrap();
248
249    ret.map_err(|e| e.into_inner().expect("unexpected execution error"))
250}
251
252/// Executes a future on the current thread.
253///
254/// The provided future must complete or be canceled before `run` will return.
255///
256/// Unlike [`tokio::spawn`], this function will always spawn on a
257/// `CurrentThread` executor and is able to spawn futures that are not `Send`.
258///
259/// # Panics
260///
261/// This function can only be invoked from the context of a `run` call; any
262/// other use will result in a panic.
263///
264/// [`tokio::spawn`]: ../fn.spawn.html
265pub fn spawn<F>(future: F)
266where
267    F: Future<Item = (), Error = ()> + 'static,
268{
269    TaskExecutor::current()
270        .spawn_local(Box::new(future))
271        .unwrap();
272}
273
274// ===== impl CurrentThread =====
275
276impl CurrentThread<ParkThread> {
277    /// Create a new instance of `CurrentThread`.
278    pub fn new() -> Self {
279        CurrentThread::new_with_park(ParkThread::new())
280    }
281}
282
283impl<P: Park> CurrentThread<P> {
284    /// Create a new instance of `CurrentThread` backed by the given park
285    /// handle.
286    pub fn new_with_park(park: P) -> Self {
287        let unpark = park.unpark();
288
289        let (spawn_sender, spawn_receiver) = mpsc::channel();
290        let thread = thread::current().id();
291        let id = EXECUTOR_ID.with(|idc| {
292            let id = idc.get();
293            idc.set(id + 1);
294            id
295        });
296
297        let scheduler = Scheduler::new(unpark);
298        let notify = scheduler.notify();
299
300        let num_futures = Arc::new(atomic::AtomicUsize::new(0));
301
302        CurrentThread {
303            scheduler: scheduler,
304            num_futures: num_futures.clone(),
305            park,
306            id,
307            spawn_handle: Handle {
308                sender: spawn_sender,
309                num_futures: num_futures,
310                notify: notify,
311                shut_down: Cell::new(false),
312                thread: thread,
313                id,
314            },
315            spawn_receiver: spawn_receiver,
316        }
317    }
318
319    /// Returns `true` if the executor is currently idle.
320    ///
321    /// An idle executor is defined by not currently having any spawned tasks.
322    ///
323    /// Note that this method is inherently racy -- if a future is spawned from a remote `Handle`,
324    /// this method may return `true` even though there are more futures to be executed.
325    pub fn is_idle(&self) -> bool {
326        self.num_futures.load(atomic::Ordering::SeqCst) <= 1
327    }
328
329    /// Spawn the future on the executor.
330    ///
331    /// This internally queues the future to be executed once `run` is called.
332    pub fn spawn<F>(&mut self, future: F) -> &mut Self
333    where
334        F: Future<Item = (), Error = ()> + 'static,
335    {
336        self.borrow().spawn_local(Box::new(future), false);
337        self
338    }
339
340    /// Synchronously waits for the provided `future` to complete.
341    ///
342    /// This function can be used to synchronously block the current thread
343    /// until the provided `future` has resolved either successfully or with an
344    /// error. The result of the future is then returned from this function
345    /// call.
346    ///
347    /// Note that this function will **also** execute any spawned futures on the
348    /// current thread, but will **not** block until these other spawned futures
349    /// have completed.
350    ///
351    /// The caller is responsible for ensuring that other spawned futures
352    /// complete execution.
353    pub fn block_on<F>(&mut self, future: F) -> Result<F::Item, BlockError<F::Error>>
354    where
355        F: Future,
356    {
357        let mut enter = tokio_executor::enter().expect("failed to start `current_thread::Runtime`");
358        self.enter(&mut enter).block_on(future)
359    }
360
361    /// Run the executor to completion, blocking the thread until **all**
362    /// spawned futures have completed.
363    pub fn run(&mut self) -> Result<(), RunError> {
364        let mut enter = tokio_executor::enter().expect("failed to start `current_thread::Runtime`");
365        self.enter(&mut enter).run()
366    }
367
368    /// Run the executor to completion, blocking the thread until all
369    /// spawned futures have completed **or** `duration` time has elapsed.
370    pub fn run_timeout(&mut self, duration: Duration) -> Result<(), RunTimeoutError> {
371        let mut enter = tokio_executor::enter().expect("failed to start `current_thread::Runtime`");
372        self.enter(&mut enter).run_timeout(duration)
373    }
374
375    /// Perform a single iteration of the event loop.
376    ///
377    /// This function blocks the current thread even if the executor is idle.
378    pub fn turn(&mut self, duration: Option<Duration>) -> Result<Turn, TurnError> {
379        let mut enter = tokio_executor::enter().expect("failed to start `current_thread::Runtime`");
380        self.enter(&mut enter).turn(duration)
381    }
382
383    /// Bind `CurrentThread` instance with an execution context.
384    pub fn enter<'a>(&'a mut self, enter: &'a mut Enter) -> Entered<'a, P> {
385        Entered {
386            executor: self,
387            enter,
388        }
389    }
390
391    /// Returns a reference to the underlying `Park` instance.
392    pub fn get_park(&self) -> &P {
393        &self.park
394    }
395
396    /// Returns a mutable reference to the underlying `Park` instance.
397    pub fn get_park_mut(&mut self) -> &mut P {
398        &mut self.park
399    }
400
401    fn borrow(&mut self) -> Borrow<P::Unpark> {
402        Borrow {
403            id: self.id,
404            scheduler: &mut self.scheduler,
405            num_futures: &*self.num_futures,
406        }
407    }
408
409    /// Get a new handle to spawn futures on the executor
410    ///
411    /// Different to the executor itself, the handle can be sent to different
412    /// threads and can be used to spawn futures on the executor.
413    pub fn handle(&self) -> Handle {
414        self.spawn_handle.clone()
415    }
416}
417
418impl<P: Park> Drop for CurrentThread<P> {
419    fn drop(&mut self) {
420        // Signal to Handles that no more futures can be spawned by setting LSB.
421        //
422        // NOTE: this isn't technically necessary since the send on the mpsc will fail once the
423        // receiver is dropped, but it's useful to illustrate how clean shutdown will be
424        // implemented (e.g., by setting the LSB).
425        let pending = self.num_futures.fetch_add(1, atomic::Ordering::SeqCst);
426
427        // TODO: We currently ignore any pending futures at the time we shut down.
428        //
429        // The "proper" fix for this is to have an explicit shutdown phase (`shutdown_on_idle`)
430        // which sets LSB (as above) do make Handle::spawn stop working, and then runs until
431        // num_futures.load() == 1.
432        let _ = pending;
433    }
434}
435
436impl tokio_executor::Executor for CurrentThread {
437    fn spawn(
438        &mut self,
439        future: Box<dyn Future<Item = (), Error = ()> + Send>,
440    ) -> Result<(), SpawnError> {
441        self.borrow().spawn_local(future, false);
442        Ok(())
443    }
444}
445
446impl<T> tokio_executor::TypedExecutor<T> for CurrentThread
447where
448    T: Future<Item = (), Error = ()> + 'static,
449{
450    fn spawn(&mut self, future: T) -> Result<(), SpawnError> {
451        self.borrow().spawn_local(Box::new(future), false);
452        Ok(())
453    }
454}
455
456impl<P: Park> fmt::Debug for CurrentThread<P> {
457    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
458        fmt.debug_struct("CurrentThread")
459            .field("scheduler", &self.scheduler)
460            .field(
461                "num_futures",
462                &self.num_futures.load(atomic::Ordering::SeqCst),
463            )
464            .finish()
465    }
466}
467
468// ===== impl Entered =====
469
470impl<'a, P: Park> Entered<'a, P> {
471    /// Spawn the future on the executor.
472    ///
473    /// This internally queues the future to be executed once `run` is called.
474    pub fn spawn<F>(&mut self, future: F) -> &mut Self
475    where
476        F: Future<Item = (), Error = ()> + 'static,
477    {
478        self.executor.borrow().spawn_local(Box::new(future), false);
479        self
480    }
481
482    /// Synchronously waits for the provided `future` to complete.
483    ///
484    /// This function can be used to synchronously block the current thread
485    /// until the provided `future` has resolved either successfully or with an
486    /// error. The result of the future is then returned from this function
487    /// call.
488    ///
489    /// Note that this function will **also** execute any spawned futures on the
490    /// current thread, but will **not** block until these other spawned futures
491    /// have completed.
492    ///
493    /// The caller is responsible for ensuring that other spawned futures
494    /// complete execution.
495    pub fn block_on<F>(&mut self, future: F) -> Result<F::Item, BlockError<F::Error>>
496    where
497        F: Future,
498    {
499        let mut future = executor::spawn(future);
500        let notify = self.executor.scheduler.notify();
501
502        loop {
503            let res = self
504                .executor
505                .borrow()
506                .enter(self.enter, || future.poll_future_notify(&notify, 0));
507
508            match res {
509                Ok(Async::Ready(e)) => return Ok(e),
510                Err(e) => return Err(BlockError { inner: Some(e) }),
511                Ok(Async::NotReady) => {}
512            }
513
514            self.tick();
515
516            if let Err(_) = self.executor.park.park() {
517                return Err(BlockError { inner: None });
518            }
519        }
520    }
521
522    /// Run the executor to completion, blocking the thread until **all**
523    /// spawned futures have completed.
524    pub fn run(&mut self) -> Result<(), RunError> {
525        self.run_timeout2(None).map_err(|_| RunError { _p: () })
526    }
527
528    /// Run the executor to completion, blocking the thread until all
529    /// spawned futures have completed **or** `duration` time has elapsed.
530    pub fn run_timeout(&mut self, duration: Duration) -> Result<(), RunTimeoutError> {
531        self.run_timeout2(Some(duration))
532    }
533
534    /// Perform a single iteration of the event loop.
535    ///
536    /// This function blocks the current thread even if the executor is idle.
537    pub fn turn(&mut self, duration: Option<Duration>) -> Result<Turn, TurnError> {
538        let res = if self.executor.scheduler.has_pending_futures() {
539            self.executor.park.park_timeout(Duration::from_millis(0))
540        } else {
541            match duration {
542                Some(duration) => self.executor.park.park_timeout(duration),
543                None => self.executor.park.park(),
544            }
545        };
546
547        if res.is_err() {
548            return Err(TurnError { _p: () });
549        }
550
551        let polled = self.tick();
552
553        Ok(Turn { polled })
554    }
555
556    /// Returns a reference to the underlying `Park` instance.
557    pub fn get_park(&self) -> &P {
558        &self.executor.park
559    }
560
561    /// Returns a mutable reference to the underlying `Park` instance.
562    pub fn get_park_mut(&mut self) -> &mut P {
563        &mut self.executor.park
564    }
565
566    fn run_timeout2(&mut self, dur: Option<Duration>) -> Result<(), RunTimeoutError> {
567        if self.executor.is_idle() {
568            // Nothing to do
569            return Ok(());
570        }
571
572        let mut time = dur.map(|dur| (Instant::now() + dur, dur));
573
574        loop {
575            self.tick();
576
577            if self.executor.is_idle() {
578                return Ok(());
579            }
580
581            match time {
582                Some((until, rem)) => {
583                    if let Err(_) = self.executor.park.park_timeout(rem) {
584                        return Err(RunTimeoutError::new(false));
585                    }
586
587                    let now = Instant::now();
588
589                    if now >= until {
590                        return Err(RunTimeoutError::new(true));
591                    }
592
593                    time = Some((until, until - now));
594                }
595                None => {
596                    if let Err(_) = self.executor.park.park() {
597                        return Err(RunTimeoutError::new(false));
598                    }
599                }
600            }
601        }
602    }
603
604    /// Returns `true` if any futures were processed
605    fn tick(&mut self) -> bool {
606        // Spawn any futures that were spawned from other threads by manually
607        // looping over the receiver stream
608
609        // FIXME: Slightly ugly but needed to make the borrow checker happy
610        let (mut borrow, spawn_receiver) = (
611            Borrow {
612                id: self.executor.id,
613                scheduler: &mut self.executor.scheduler,
614                num_futures: &*self.executor.num_futures,
615            },
616            &mut self.executor.spawn_receiver,
617        );
618
619        while let Ok(future) = spawn_receiver.try_recv() {
620            borrow.spawn_local(future, true);
621        }
622
623        // After any pending futures were scheduled, do the actual tick
624        borrow
625            .scheduler
626            .tick(borrow.id, &mut *self.enter, borrow.num_futures)
627    }
628}
629
630impl<'a, P: Park> fmt::Debug for Entered<'a, P> {
631    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
632        fmt.debug_struct("Entered")
633            .field("executor", &self.executor)
634            .field("enter", &self.enter)
635            .finish()
636    }
637}
638
639// ===== impl Handle =====
640
641/// Handle to spawn a future on the corresponding `CurrentThread` instance
642#[derive(Clone)]
643pub struct Handle {
644    sender: mpsc::Sender<Box<dyn Future<Item = (), Error = ()> + Send + 'static>>,
645    num_futures: Arc<atomic::AtomicUsize>,
646    shut_down: Cell<bool>,
647    notify: executor::NotifyHandle,
648    thread: thread::ThreadId,
649
650    /// The thread-local ID assigned to this Handle's executor.
651    id: u64,
652}
653
654// Manual implementation because the Sender does not implement Debug
655impl fmt::Debug for Handle {
656    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
657        fmt.debug_struct("Handle")
658            .field("shut_down", &self.shut_down.get())
659            .finish()
660    }
661}
662
663impl Handle {
664    /// Spawn a future onto the `CurrentThread` instance corresponding to this handle
665    ///
666    /// # Panics
667    ///
668    /// This function panics if the spawn fails. Failure occurs if the `CurrentThread`
669    /// instance of the `Handle` does not exist anymore.
670    pub fn spawn<F>(&self, future: F) -> Result<(), SpawnError>
671    where
672        F: Future<Item = (), Error = ()> + Send + 'static,
673    {
674        if thread::current().id() == self.thread {
675            let mut e = TaskExecutor::current();
676            if e.id() == Some(self.id) {
677                return e.spawn_local(Box::new(future));
678            }
679        }
680
681        if self.shut_down.get() {
682            return Err(SpawnError::shutdown());
683        }
684
685        // NOTE: += 2 since LSB is the shutdown bit
686        let pending = self.num_futures.fetch_add(2, atomic::Ordering::SeqCst);
687        if pending % 2 == 1 {
688            // Bring the count back so we still know when the Runtime is idle.
689            self.num_futures.fetch_sub(2, atomic::Ordering::SeqCst);
690
691            // Once the Runtime is shutting down, we know it won't come back.
692            self.shut_down.set(true);
693
694            return Err(SpawnError::shutdown());
695        }
696
697        self.sender
698            .send(Box::new(future))
699            .expect("CurrentThread does not exist anymore");
700        // use 0 for the id, CurrentThread does not make use of it
701        self.notify.notify(0);
702        Ok(())
703    }
704
705    /// Provides a best effort **hint** to whether or not `spawn` will succeed.
706    ///
707    /// This function may return both false positives **and** false negatives.
708    /// If `status` returns `Ok`, then a call to `spawn` will *probably*
709    /// succeed, but may fail. If `status` returns `Err`, a call to `spawn` will
710    /// *probably* fail, but may succeed.
711    ///
712    /// This allows a caller to avoid creating the task if the call to `spawn`
713    /// has a high likelihood of failing.
714    pub fn status(&self) -> Result<(), SpawnError> {
715        if self.shut_down.get() {
716            return Err(SpawnError::shutdown());
717        }
718
719        Ok(())
720    }
721}
722
723// ===== impl TaskExecutor =====
724
725impl TaskExecutor {
726    /// Returns an executor that executes futures on the current thread.
727    ///
728    /// The user of `TaskExecutor` must ensure that when a future is submitted,
729    /// that it is done within the context of a call to `run`.
730    ///
731    /// For more details, see the [module level](index.html) documentation.
732    pub fn current() -> TaskExecutor {
733        TaskExecutor {
734            _p: ::std::marker::PhantomData,
735        }
736    }
737
738    /// Get the current executor's thread-local ID.
739    fn id(&self) -> Option<u64> {
740        CURRENT.with(|current| current.id.get())
741    }
742
743    /// Spawn a future onto the current `CurrentThread` instance.
744    pub fn spawn_local(
745        &mut self,
746        future: Box<dyn Future<Item = (), Error = ()>>,
747    ) -> Result<(), SpawnError> {
748        CURRENT.with(|current| match current.spawn.get() {
749            Some(spawn) => {
750                unsafe { (*spawn).spawn_local(future, false) };
751                Ok(())
752            }
753            None => Err(SpawnError::shutdown()),
754        })
755    }
756}
757
758impl tokio_executor::Executor for TaskExecutor {
759    fn spawn(
760        &mut self,
761        future: Box<dyn Future<Item = (), Error = ()> + Send>,
762    ) -> Result<(), SpawnError> {
763        self.spawn_local(future)
764    }
765}
766
767impl<F> tokio_executor::TypedExecutor<F> for TaskExecutor
768where
769    F: Future<Item = (), Error = ()> + 'static,
770{
771    fn spawn(&mut self, future: F) -> Result<(), SpawnError> {
772        self.spawn_local(Box::new(future))
773    }
774}
775
776impl<F> Executor<F> for TaskExecutor
777where
778    F: Future<Item = (), Error = ()> + 'static,
779{
780    fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
781        CURRENT.with(|current| match current.spawn.get() {
782            Some(spawn) => {
783                unsafe { (*spawn).spawn_local(Box::new(future), false) };
784                Ok(())
785            }
786            None => Err(ExecuteError::new(ExecuteErrorKind::Shutdown, future)),
787        })
788    }
789}
790
791// ===== impl Borrow =====
792
793impl<'a, U: Unpark> Borrow<'a, U> {
794    fn enter<F, R>(&mut self, _: &mut Enter, f: F) -> R
795    where
796        F: FnOnce() -> R,
797    {
798        CURRENT.with(|current| {
799            current.id.set(Some(self.id));
800            current.set_spawn(self, || f())
801        })
802    }
803}
804
805impl<'a, U: Unpark> SpawnLocal for Borrow<'a, U> {
806    fn spawn_local(
807        &mut self,
808        future: Box<dyn Future<Item = (), Error = ()>>,
809        already_counted: bool,
810    ) {
811        if !already_counted {
812            // NOTE: we have a borrow of the Runtime, so we know that it isn't shut down.
813            // NOTE: += 2 since LSB is the shutdown bit
814            self.num_futures.fetch_add(2, atomic::Ordering::SeqCst);
815        }
816        self.scheduler.schedule(future);
817    }
818}
819
820// ===== impl CurrentRunner =====
821
822impl CurrentRunner {
823    fn set_spawn<F, R>(&self, spawn: &mut dyn SpawnLocal, f: F) -> R
824    where
825        F: FnOnce() -> R,
826    {
827        struct Reset<'a>(&'a CurrentRunner);
828
829        impl<'a> Drop for Reset<'a> {
830            fn drop(&mut self) {
831                self.0.spawn.set(None);
832                self.0.id.set(None);
833            }
834        }
835
836        let _reset = Reset(self);
837
838        let spawn = unsafe { hide_lt(spawn as *mut dyn SpawnLocal) };
839        self.spawn.set(Some(spawn));
840
841        f()
842    }
843}
844
845unsafe fn hide_lt<'a>(p: *mut (dyn SpawnLocal + 'a)) -> *mut (dyn SpawnLocal + 'static) {
846    use std::mem;
847    mem::transmute(p)
848}
849
850// ===== impl RunTimeoutError =====
851
852impl RunTimeoutError {
853    fn new(timeout: bool) -> Self {
854        RunTimeoutError { timeout }
855    }
856
857    /// Returns `true` if the error was caused by the operation timing out.
858    pub fn is_timeout(&self) -> bool {
859        self.timeout
860    }
861}
862
863impl From<tokio_executor::EnterError> for RunTimeoutError {
864    fn from(_: tokio_executor::EnterError) -> Self {
865        RunTimeoutError::new(false)
866    }
867}
868
869// ===== impl BlockError =====
870
871impl<T> BlockError<T> {
872    /// Returns the error yielded by the future being blocked on
873    pub fn into_inner(self) -> Option<T> {
874        self.inner
875    }
876}
877
878impl<T> From<tokio_executor::EnterError> for BlockError<T> {
879    fn from(_: tokio_executor::EnterError) -> Self {
880        BlockError { inner: None }
881    }
882}