1#![doc(html_root_url = "https://docs.rs/tokio-current-thread/0.1.7")]
2#![deny(missing_docs, missing_debug_implementations)]
3
4extern crate futures;
37extern crate tokio_executor;
38
39mod scheduler;
40
41use self::scheduler::Scheduler;
42
43use tokio_executor::park::{Park, ParkThread, Unpark};
44use tokio_executor::{Enter, SpawnError};
45
46use futures::future::{ExecuteError, ExecuteErrorKind, Executor};
47use futures::{executor, Async, Future};
48
49use std::cell::Cell;
50use std::error::Error;
51use std::fmt;
52use std::rc::Rc;
53use std::sync::{atomic, mpsc, Arc};
54use std::thread;
55use std::time::{Duration, Instant};
56
57pub struct CurrentThread<P: Park = ParkThread> {
59 scheduler: Scheduler<P::Unpark>,
61
62 num_futures: Arc<atomic::AtomicUsize>,
67
68 park: P,
70
71 spawn_handle: Handle,
73
74 spawn_receiver: mpsc::Receiver<Box<dyn Future<Item = (), Error = ()> + Send + 'static>>,
76
77 id: u64,
79}
80
81#[derive(Debug, Clone)]
89pub struct TaskExecutor {
90 _p: ::std::marker::PhantomData<Rc<()>>,
92}
93
94#[derive(Debug)]
96pub struct Turn {
97 polled: bool,
98}
99
100impl Turn {
101 pub fn has_polled(&self) -> bool {
103 self.polled
104 }
105}
106
107pub struct Entered<'a, P: Park + 'a> {
109 executor: &'a mut CurrentThread<P>,
110 enter: &'a mut Enter,
111}
112
113#[derive(Debug)]
115pub struct RunError {
116 _p: (),
117}
118
119impl fmt::Display for RunError {
120 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
121 write!(fmt, "{}", self.description())
122 }
123}
124
125impl Error for RunError {
126 fn description(&self) -> &str {
127 "Run error"
128 }
129}
130
131#[derive(Debug)]
133pub struct RunTimeoutError {
134 timeout: bool,
135}
136
137impl fmt::Display for RunTimeoutError {
138 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
139 write!(fmt, "{}", self.description())
140 }
141}
142
143impl Error for RunTimeoutError {
144 fn description(&self) -> &str {
145 if self.timeout {
146 "Run timeout error (timeout)"
147 } else {
148 "Run timeout error (not timeout)"
149 }
150 }
151}
152
153#[derive(Debug)]
155pub struct TurnError {
156 _p: (),
157}
158
159impl fmt::Display for TurnError {
160 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
161 write!(fmt, "{}", self.description())
162 }
163}
164
165impl Error for TurnError {
166 fn description(&self) -> &str {
167 "Turn error"
168 }
169}
170
171#[derive(Debug)]
173pub struct BlockError<T> {
174 inner: Option<T>,
175}
176
177impl<T> fmt::Display for BlockError<T> {
178 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
179 write!(fmt, "Block error")
180 }
181}
182
183impl<T: fmt::Debug> Error for BlockError<T> {
184 fn description(&self) -> &str {
185 "Block error"
186 }
187}
188
189struct Borrow<'a, U: 'a> {
191 id: u64,
192 scheduler: &'a mut Scheduler<U>,
193 num_futures: &'a atomic::AtomicUsize,
194}
195
196trait SpawnLocal {
197 fn spawn_local(
198 &mut self,
199 future: Box<dyn Future<Item = (), Error = ()>>,
200 already_counted: bool,
201 );
202}
203
204struct CurrentRunner {
205 spawn: Cell<Option<*mut dyn SpawnLocal>>,
206 id: Cell<Option<u64>>,
207}
208
209thread_local! {
210 static CURRENT: CurrentRunner = CurrentRunner {
212 spawn: Cell::new(None),
213 id: Cell::new(None),
214 }
215}
216
217thread_local! {
218 static EXECUTOR_ID: Cell<u64> = Cell::new(0)
223}
224
225pub fn block_on_all<F>(future: F) -> Result<F::Item, F::Error>
241where
242 F: Future,
243{
244 let mut current_thread = CurrentThread::new();
245
246 let ret = current_thread.block_on(future);
247 current_thread.run().unwrap();
248
249 ret.map_err(|e| e.into_inner().expect("unexpected execution error"))
250}
251
252pub fn spawn<F>(future: F)
266where
267 F: Future<Item = (), Error = ()> + 'static,
268{
269 TaskExecutor::current()
270 .spawn_local(Box::new(future))
271 .unwrap();
272}
273
274impl CurrentThread<ParkThread> {
277 pub fn new() -> Self {
279 CurrentThread::new_with_park(ParkThread::new())
280 }
281}
282
283impl<P: Park> CurrentThread<P> {
284 pub fn new_with_park(park: P) -> Self {
287 let unpark = park.unpark();
288
289 let (spawn_sender, spawn_receiver) = mpsc::channel();
290 let thread = thread::current().id();
291 let id = EXECUTOR_ID.with(|idc| {
292 let id = idc.get();
293 idc.set(id + 1);
294 id
295 });
296
297 let scheduler = Scheduler::new(unpark);
298 let notify = scheduler.notify();
299
300 let num_futures = Arc::new(atomic::AtomicUsize::new(0));
301
302 CurrentThread {
303 scheduler: scheduler,
304 num_futures: num_futures.clone(),
305 park,
306 id,
307 spawn_handle: Handle {
308 sender: spawn_sender,
309 num_futures: num_futures,
310 notify: notify,
311 shut_down: Cell::new(false),
312 thread: thread,
313 id,
314 },
315 spawn_receiver: spawn_receiver,
316 }
317 }
318
319 pub fn is_idle(&self) -> bool {
326 self.num_futures.load(atomic::Ordering::SeqCst) <= 1
327 }
328
329 pub fn spawn<F>(&mut self, future: F) -> &mut Self
333 where
334 F: Future<Item = (), Error = ()> + 'static,
335 {
336 self.borrow().spawn_local(Box::new(future), false);
337 self
338 }
339
340 pub fn block_on<F>(&mut self, future: F) -> Result<F::Item, BlockError<F::Error>>
354 where
355 F: Future,
356 {
357 let mut enter = tokio_executor::enter().expect("failed to start `current_thread::Runtime`");
358 self.enter(&mut enter).block_on(future)
359 }
360
361 pub fn run(&mut self) -> Result<(), RunError> {
364 let mut enter = tokio_executor::enter().expect("failed to start `current_thread::Runtime`");
365 self.enter(&mut enter).run()
366 }
367
368 pub fn run_timeout(&mut self, duration: Duration) -> Result<(), RunTimeoutError> {
371 let mut enter = tokio_executor::enter().expect("failed to start `current_thread::Runtime`");
372 self.enter(&mut enter).run_timeout(duration)
373 }
374
375 pub fn turn(&mut self, duration: Option<Duration>) -> Result<Turn, TurnError> {
379 let mut enter = tokio_executor::enter().expect("failed to start `current_thread::Runtime`");
380 self.enter(&mut enter).turn(duration)
381 }
382
383 pub fn enter<'a>(&'a mut self, enter: &'a mut Enter) -> Entered<'a, P> {
385 Entered {
386 executor: self,
387 enter,
388 }
389 }
390
391 pub fn get_park(&self) -> &P {
393 &self.park
394 }
395
396 pub fn get_park_mut(&mut self) -> &mut P {
398 &mut self.park
399 }
400
401 fn borrow(&mut self) -> Borrow<P::Unpark> {
402 Borrow {
403 id: self.id,
404 scheduler: &mut self.scheduler,
405 num_futures: &*self.num_futures,
406 }
407 }
408
409 pub fn handle(&self) -> Handle {
414 self.spawn_handle.clone()
415 }
416}
417
418impl<P: Park> Drop for CurrentThread<P> {
419 fn drop(&mut self) {
420 let pending = self.num_futures.fetch_add(1, atomic::Ordering::SeqCst);
426
427 let _ = pending;
433 }
434}
435
436impl tokio_executor::Executor for CurrentThread {
437 fn spawn(
438 &mut self,
439 future: Box<dyn Future<Item = (), Error = ()> + Send>,
440 ) -> Result<(), SpawnError> {
441 self.borrow().spawn_local(future, false);
442 Ok(())
443 }
444}
445
446impl<T> tokio_executor::TypedExecutor<T> for CurrentThread
447where
448 T: Future<Item = (), Error = ()> + 'static,
449{
450 fn spawn(&mut self, future: T) -> Result<(), SpawnError> {
451 self.borrow().spawn_local(Box::new(future), false);
452 Ok(())
453 }
454}
455
456impl<P: Park> fmt::Debug for CurrentThread<P> {
457 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
458 fmt.debug_struct("CurrentThread")
459 .field("scheduler", &self.scheduler)
460 .field(
461 "num_futures",
462 &self.num_futures.load(atomic::Ordering::SeqCst),
463 )
464 .finish()
465 }
466}
467
468impl<'a, P: Park> Entered<'a, P> {
471 pub fn spawn<F>(&mut self, future: F) -> &mut Self
475 where
476 F: Future<Item = (), Error = ()> + 'static,
477 {
478 self.executor.borrow().spawn_local(Box::new(future), false);
479 self
480 }
481
482 pub fn block_on<F>(&mut self, future: F) -> Result<F::Item, BlockError<F::Error>>
496 where
497 F: Future,
498 {
499 let mut future = executor::spawn(future);
500 let notify = self.executor.scheduler.notify();
501
502 loop {
503 let res = self
504 .executor
505 .borrow()
506 .enter(self.enter, || future.poll_future_notify(¬ify, 0));
507
508 match res {
509 Ok(Async::Ready(e)) => return Ok(e),
510 Err(e) => return Err(BlockError { inner: Some(e) }),
511 Ok(Async::NotReady) => {}
512 }
513
514 self.tick();
515
516 if let Err(_) = self.executor.park.park() {
517 return Err(BlockError { inner: None });
518 }
519 }
520 }
521
522 pub fn run(&mut self) -> Result<(), RunError> {
525 self.run_timeout2(None).map_err(|_| RunError { _p: () })
526 }
527
528 pub fn run_timeout(&mut self, duration: Duration) -> Result<(), RunTimeoutError> {
531 self.run_timeout2(Some(duration))
532 }
533
534 pub fn turn(&mut self, duration: Option<Duration>) -> Result<Turn, TurnError> {
538 let res = if self.executor.scheduler.has_pending_futures() {
539 self.executor.park.park_timeout(Duration::from_millis(0))
540 } else {
541 match duration {
542 Some(duration) => self.executor.park.park_timeout(duration),
543 None => self.executor.park.park(),
544 }
545 };
546
547 if res.is_err() {
548 return Err(TurnError { _p: () });
549 }
550
551 let polled = self.tick();
552
553 Ok(Turn { polled })
554 }
555
556 pub fn get_park(&self) -> &P {
558 &self.executor.park
559 }
560
561 pub fn get_park_mut(&mut self) -> &mut P {
563 &mut self.executor.park
564 }
565
566 fn run_timeout2(&mut self, dur: Option<Duration>) -> Result<(), RunTimeoutError> {
567 if self.executor.is_idle() {
568 return Ok(());
570 }
571
572 let mut time = dur.map(|dur| (Instant::now() + dur, dur));
573
574 loop {
575 self.tick();
576
577 if self.executor.is_idle() {
578 return Ok(());
579 }
580
581 match time {
582 Some((until, rem)) => {
583 if let Err(_) = self.executor.park.park_timeout(rem) {
584 return Err(RunTimeoutError::new(false));
585 }
586
587 let now = Instant::now();
588
589 if now >= until {
590 return Err(RunTimeoutError::new(true));
591 }
592
593 time = Some((until, until - now));
594 }
595 None => {
596 if let Err(_) = self.executor.park.park() {
597 return Err(RunTimeoutError::new(false));
598 }
599 }
600 }
601 }
602 }
603
604 fn tick(&mut self) -> bool {
606 let (mut borrow, spawn_receiver) = (
611 Borrow {
612 id: self.executor.id,
613 scheduler: &mut self.executor.scheduler,
614 num_futures: &*self.executor.num_futures,
615 },
616 &mut self.executor.spawn_receiver,
617 );
618
619 while let Ok(future) = spawn_receiver.try_recv() {
620 borrow.spawn_local(future, true);
621 }
622
623 borrow
625 .scheduler
626 .tick(borrow.id, &mut *self.enter, borrow.num_futures)
627 }
628}
629
630impl<'a, P: Park> fmt::Debug for Entered<'a, P> {
631 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
632 fmt.debug_struct("Entered")
633 .field("executor", &self.executor)
634 .field("enter", &self.enter)
635 .finish()
636 }
637}
638
639#[derive(Clone)]
643pub struct Handle {
644 sender: mpsc::Sender<Box<dyn Future<Item = (), Error = ()> + Send + 'static>>,
645 num_futures: Arc<atomic::AtomicUsize>,
646 shut_down: Cell<bool>,
647 notify: executor::NotifyHandle,
648 thread: thread::ThreadId,
649
650 id: u64,
652}
653
654impl fmt::Debug for Handle {
656 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
657 fmt.debug_struct("Handle")
658 .field("shut_down", &self.shut_down.get())
659 .finish()
660 }
661}
662
663impl Handle {
664 pub fn spawn<F>(&self, future: F) -> Result<(), SpawnError>
671 where
672 F: Future<Item = (), Error = ()> + Send + 'static,
673 {
674 if thread::current().id() == self.thread {
675 let mut e = TaskExecutor::current();
676 if e.id() == Some(self.id) {
677 return e.spawn_local(Box::new(future));
678 }
679 }
680
681 if self.shut_down.get() {
682 return Err(SpawnError::shutdown());
683 }
684
685 let pending = self.num_futures.fetch_add(2, atomic::Ordering::SeqCst);
687 if pending % 2 == 1 {
688 self.num_futures.fetch_sub(2, atomic::Ordering::SeqCst);
690
691 self.shut_down.set(true);
693
694 return Err(SpawnError::shutdown());
695 }
696
697 self.sender
698 .send(Box::new(future))
699 .expect("CurrentThread does not exist anymore");
700 self.notify.notify(0);
702 Ok(())
703 }
704
705 pub fn status(&self) -> Result<(), SpawnError> {
715 if self.shut_down.get() {
716 return Err(SpawnError::shutdown());
717 }
718
719 Ok(())
720 }
721}
722
723impl TaskExecutor {
726 pub fn current() -> TaskExecutor {
733 TaskExecutor {
734 _p: ::std::marker::PhantomData,
735 }
736 }
737
738 fn id(&self) -> Option<u64> {
740 CURRENT.with(|current| current.id.get())
741 }
742
743 pub fn spawn_local(
745 &mut self,
746 future: Box<dyn Future<Item = (), Error = ()>>,
747 ) -> Result<(), SpawnError> {
748 CURRENT.with(|current| match current.spawn.get() {
749 Some(spawn) => {
750 unsafe { (*spawn).spawn_local(future, false) };
751 Ok(())
752 }
753 None => Err(SpawnError::shutdown()),
754 })
755 }
756}
757
758impl tokio_executor::Executor for TaskExecutor {
759 fn spawn(
760 &mut self,
761 future: Box<dyn Future<Item = (), Error = ()> + Send>,
762 ) -> Result<(), SpawnError> {
763 self.spawn_local(future)
764 }
765}
766
767impl<F> tokio_executor::TypedExecutor<F> for TaskExecutor
768where
769 F: Future<Item = (), Error = ()> + 'static,
770{
771 fn spawn(&mut self, future: F) -> Result<(), SpawnError> {
772 self.spawn_local(Box::new(future))
773 }
774}
775
776impl<F> Executor<F> for TaskExecutor
777where
778 F: Future<Item = (), Error = ()> + 'static,
779{
780 fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
781 CURRENT.with(|current| match current.spawn.get() {
782 Some(spawn) => {
783 unsafe { (*spawn).spawn_local(Box::new(future), false) };
784 Ok(())
785 }
786 None => Err(ExecuteError::new(ExecuteErrorKind::Shutdown, future)),
787 })
788 }
789}
790
791impl<'a, U: Unpark> Borrow<'a, U> {
794 fn enter<F, R>(&mut self, _: &mut Enter, f: F) -> R
795 where
796 F: FnOnce() -> R,
797 {
798 CURRENT.with(|current| {
799 current.id.set(Some(self.id));
800 current.set_spawn(self, || f())
801 })
802 }
803}
804
805impl<'a, U: Unpark> SpawnLocal for Borrow<'a, U> {
806 fn spawn_local(
807 &mut self,
808 future: Box<dyn Future<Item = (), Error = ()>>,
809 already_counted: bool,
810 ) {
811 if !already_counted {
812 self.num_futures.fetch_add(2, atomic::Ordering::SeqCst);
815 }
816 self.scheduler.schedule(future);
817 }
818}
819
820impl CurrentRunner {
823 fn set_spawn<F, R>(&self, spawn: &mut dyn SpawnLocal, f: F) -> R
824 where
825 F: FnOnce() -> R,
826 {
827 struct Reset<'a>(&'a CurrentRunner);
828
829 impl<'a> Drop for Reset<'a> {
830 fn drop(&mut self) {
831 self.0.spawn.set(None);
832 self.0.id.set(None);
833 }
834 }
835
836 let _reset = Reset(self);
837
838 let spawn = unsafe { hide_lt(spawn as *mut dyn SpawnLocal) };
839 self.spawn.set(Some(spawn));
840
841 f()
842 }
843}
844
845unsafe fn hide_lt<'a>(p: *mut (dyn SpawnLocal + 'a)) -> *mut (dyn SpawnLocal + 'static) {
846 use std::mem;
847 mem::transmute(p)
848}
849
850impl RunTimeoutError {
853 fn new(timeout: bool) -> Self {
854 RunTimeoutError { timeout }
855 }
856
857 pub fn is_timeout(&self) -> bool {
859 self.timeout
860 }
861}
862
863impl From<tokio_executor::EnterError> for RunTimeoutError {
864 fn from(_: tokio_executor::EnterError) -> Self {
865 RunTimeoutError::new(false)
866 }
867}
868
869impl<T> BlockError<T> {
872 pub fn into_inner(self) -> Option<T> {
874 self.inner
875 }
876}
877
878impl<T> From<tokio_executor::EnterError> for BlockError<T> {
879 fn from(_: tokio_executor::EnterError) -> Self {
880 BlockError { inner: None }
881 }
882}