1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use crate::Error;
use async_trait::async_trait;
use futures::Future;
use std::{io, net::SocketAddr, time};
use tokio_executor::current_thread;
use tokio_net::driver::Reactor;
use tokio_timer::{clock::Clock, timer};
mod net;
#[derive(Debug, Clone)]
pub struct SingleThreadedRuntimeHandle {
    executor_handle: current_thread::Handle,
    clock_handle: Clock,
    timer_handle: timer::Handle,
}

#[async_trait]
impl crate::Environment for SingleThreadedRuntimeHandle {
    type TcpStream = tokio::net::TcpStream;
    type TcpListener = tokio::net::TcpListener;
    fn spawn<F>(&self, future: F)
    where
        F: Future<Output = ()> + Send + 'static,
    {
        self.executor_handle
            .spawn(future)
            .expect("failed to spawn task")
    }
    fn now(&self) -> time::Instant {
        self.clock_handle.now()
    }
    fn delay(&self, deadline: time::Instant) -> tokio::timer::Delay {
        self.timer_handle.delay(deadline)
    }
    fn timeout<T>(&self, value: T, timeout: time::Duration) -> tokio::timer::Timeout<T> {
        self.timer_handle.timeout(value, timeout)
    }
    async fn bind<A>(&self, addr: A) -> Result<Self::TcpListener, io::Error>
    where
        A: Into<SocketAddr> + Send + Sync,
    {
        tokio::net::TcpListener::bind(addr.into()).await
    }
    async fn connect<A>(&self, addr: A) -> Result<Self::TcpStream, io::Error>
    where
        A: Into<SocketAddr> + Send + Sync,
    {
        tokio::net::TcpStream::connect(addr.into()).await
    }
}

pub struct SingleThreadedRuntime {
    reactor_handle: tokio_net::driver::Handle,
    timer_handle: tokio_timer::timer::Handle,
    clock: Clock,
    executor: current_thread::CurrentThread<timer::Timer<Reactor>>,
}

impl SingleThreadedRuntime {
    pub fn new() -> Result<Self, Error> {
        let reactor = Reactor::new().map_err(|source| Error::RuntimeBuild { source })?;
        let reactor_handle = reactor.handle();
        let clock = Clock::new();
        let timer = tokio_timer::Timer::new_with_now(reactor, clock.clone());
        let timer_handle = timer.handle();
        let executor = current_thread::CurrentThread::new_with_park(timer);
        let runtime = SingleThreadedRuntime {
            reactor_handle,
            timer_handle,
            clock,
            executor,
        };
        Ok(runtime)
    }

    pub fn handle(&self) -> SingleThreadedRuntimeHandle {
        let executor_handle = self.executor.handle();
        let clock_handle = self.clock.clone();
        let timer_handle = self.timer_handle.clone();
        SingleThreadedRuntimeHandle {
            executor_handle,
            clock_handle,
            timer_handle,
        }
    }
    pub fn spawn<F>(&mut self, future: F) -> &mut Self
    where
        F: Future<Output = ()> + 'static,
    {
        self.executor.spawn(future);
        self
    }

    pub fn run(&mut self) -> Result<(), Error> {
        self.enter(|executor| executor.run())
            .map_err(|source| Error::CurrentThreadRun { source })
    }

    pub fn block_on<F>(&mut self, f: F) -> F::Output
    where
        F: Future,
    {
        self.enter(|executor| executor.block_on(f))
    }

    fn enter<F, R>(&mut self, f: F) -> R
    where
        F: FnOnce(&mut current_thread::CurrentThread<timer::Timer<Reactor>>) -> R,
    {
        let SingleThreadedRuntime {
            ref reactor_handle,
            ref timer_handle,
            ref clock,
            ref mut executor,
        } = *self;
        let _reactor = tokio_net::driver::set_default(&reactor_handle);
        let clock = clock;
        tokio_timer::clock::with_default(&clock, || {
            let _timer = tokio_timer::timer::set_default(&timer_handle);
            let mut default_executor = tokio_executor::current_thread::TaskExecutor::current();
            tokio_executor::with_default(&mut default_executor, || f(executor))
        })
    }
}