1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
use std::io;
use std::sync::Arc;

use hyper::server::{NewService, Service};
use futures::{future, Future};
use futures_cpupool::{CpuPool, CpuFuture};

use request::{Request, HyperRequest};
use response::HyperResponse;
use error::HyperError;
use middleware::Handler;

pub struct InitialService<H>
    where H: Handler
{
    pub handler: Arc<H>,
    pub thread_pool: Arc<CpuPool>,
}

impl<H> InitialService<H>
    where H: Handler
{
    pub fn new(handler: H, thread_pool_size: Option<usize>) -> InitialService<H> {
        let thread_pool = if let Some(size) = thread_pool_size {
            CpuPool::new(size)
        } else {
            CpuPool::new_num_cpus()
        };

        InitialService {
            handler: Arc::new(handler),
            thread_pool: Arc::new(thread_pool),
        }
    }
}

impl<H> Clone for InitialService<H>
    where H: Handler
{
    fn clone(&self) -> Self {
        InitialService {
            handler: self.handler.clone(),
            thread_pool: self.thread_pool.clone(),
        }
    }
}

impl<H> NewService for InitialService<H>
    where H: Handler
{
    type Request = HyperRequest;
    type Response = HyperResponse;
    type Error = HyperError;
    type Instance = Self;

    fn new_service(&self) -> io::Result<Self::Instance> {
        Ok(self.clone())
    }
}

impl<H> Service for InitialService<H>
    where H: Handler
{
    type Request = HyperRequest;
    type Response = HyperResponse;
    type Error = HyperError;
    type Future = CpuFuture<Self::Response, Self::Error>;

    fn call(&self, request: Self::Request) -> Self::Future {
        let mut request = Request::new(request);
        let handler = self.handler.clone();

        self.thread_pool.spawn_fn(move || {
            let handle_result = match handler.handle(&mut request) {
                Ok(response) => Box::new(future::ok(response)),
                Err(err) => Box::new(future::err(err))
            };
            Box::new(handle_result
                .and_then(move |response| {
                    future::ok(HyperResponse::from(response))
                })
                .or_else(move |error| {
                    future::ok(HyperResponse::from(error))
                })
            )
        })
    }
}