1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
//! Connector with hyper backend.

use std::{fmt, rc::Rc, str::FromStr};

use futures::{future::result, Future, Stream};
use hyper::{
    self,
    client::{connect::Connect, Client},
    Body, Method, Request, Uri,
};
use hyper_tls::HttpsConnector;

use telegram_bot_fork_raw::{
    Body as TelegramBody, HttpRequest, HttpResponse, Method as TelegramMethod,
};

use errors::Error;
use future::{NewTelegramFuture, TelegramFuture};

use super::_base::Connector;

/// This connector uses `hyper` backend.
pub struct HyperConnector<C> {
    inner: Rc<Client<C>>,
}

impl<C> fmt::Debug for HyperConnector<C> {
    fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
        "hyper connector".fmt(formatter)
    }
}

impl<C> HyperConnector<C> {
    pub fn new(client: Client<C>) -> Self {
        HyperConnector {
            inner: Rc::new(client),
        }
    }
}

impl<C: Connect + 'static> Connector for HyperConnector<C> {
    fn request(
        &self,
        url: Option<&str>,
        token: &str,
        req: HttpRequest,
    ) -> TelegramFuture<HttpResponse> {
        let uri = result(Uri::from_str(&req.url.url(url, token))).from_err();

        let client = self.inner.clone();
        let request = uri.and_then(move |uri| {
            let method = match req.method {
                TelegramMethod::Get => Method::GET,
                TelegramMethod::Post => Method::POST,
            };
            let mut builder = Request::builder();
            let http_request = builder.method(method).uri(uri);

            let http_request = match req.body {
                TelegramBody::Empty => http_request.body(Body::empty()).unwrap(),
                TelegramBody::Json(body) => {
                    let mut r = http_request.body(body.into()).unwrap();
                    r.headers_mut().insert(
                        hyper::header::CONTENT_TYPE,
                        "application/json".parse().unwrap(),
                    );
                    r
                }
                body => panic!("Unknown body type {:?}", body),
            };

            client.request(http_request).from_err()
        });

        let future = request.and_then(move |response| {
            response.into_body().from_err().fold(
                vec![],
                |mut result, chunk| -> Result<Vec<u8>, Error> {
                    result.extend_from_slice(&chunk);
                    Ok(result)
                },
            )
        });

        let future = future.and_then(|body| Ok(HttpResponse { body: Some(body) }));

        TelegramFuture::new(Box::new(future))
    }
}

/// Returns default hyper connector. Uses one resolve thread and `HttpsConnector`.
pub fn default_connector() -> Result<Box<Connector>, Error> {
    let connector = HttpsConnector::new(1).map_err(|err| {
        ::std::io::Error::new(::std::io::ErrorKind::Other, format!("tls error: {}", err))
    })?;
    let client = Client::builder().build(connector);
    Ok(Box::new(HyperConnector::new(client)))
}