ntex_h2/client/
connector.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use std::{marker::PhantomData, ops};

use ntex_bytes::ByteString;
use ntex_http::uri::Scheme;
use ntex_io::IoBoxed;
use ntex_net::connect::{self as connect, Address, Connect, Connector as DefaultConnector};
use ntex_service::{IntoService, Pipeline, Service};
use ntex_util::time::timeout_checked;

use crate::{client::ClientError, client::SimpleClient, config::Config};

#[derive(Debug)]
/// Http2 client connector
pub struct Connector<A: Address, T> {
    connector: Pipeline<T>,
    config: Config,
    scheme: Scheme,

    _t: PhantomData<A>,
}

impl<A, T> Connector<A, T>
where
    A: Address,
    T: Service<Connect<A>, Error = connect::ConnectError>,
    IoBoxed: From<T::Response>,
{
    /// Create new http2 connector
    pub fn new<F>(connector: F) -> Connector<A, T>
    where
        F: IntoService<T, Connect<A>>,
    {
        Connector {
            connector: Pipeline::new(connector.into_service()),
            config: Config::client(),
            scheme: Scheme::HTTP,
            _t: PhantomData,
        }
    }
}

impl<A> Default for Connector<A, DefaultConnector<A>>
where
    A: Address,
{
    /// Create new h2 connector
    fn default() -> Self {
        Connector {
            connector: DefaultConnector::default().into(),
            config: Config::client(),
            scheme: Scheme::HTTP,
            _t: PhantomData,
        }
    }
}

impl<A: Address, T> ops::Deref for Connector<A, T> {
    type Target = Config;

    fn deref(&self) -> &Self::Target {
        &self.config
    }
}

impl<A: Address, T> ops::DerefMut for Connector<A, T> {
    fn deref_mut(&mut self) -> &mut Config {
        &mut self.config
    }
}

impl<A, T> Connector<A, T>
where
    A: Address,
{
    #[inline]
    /// Set scheme
    pub fn scheme(&mut self, scheme: Scheme) -> &mut Self {
        self.scheme = scheme;
        self
    }

    /// Use custom connector
    pub fn connector<U, F>(&self, connector: F) -> Connector<A, U>
    where
        F: IntoService<U, Connect<A>>,
        U: Service<Connect<A>, Error = connect::ConnectError>,
        IoBoxed: From<U::Response>,
    {
        Connector {
            connector: connector.into_service().into(),
            config: self.config.clone(),
            scheme: self.scheme.clone(),
            _t: PhantomData,
        }
    }
}

impl<A, T> Connector<A, T>
where
    A: Address,
    T: Service<Connect<A>, Error = connect::ConnectError>,
    IoBoxed: From<T::Response>,
{
    /// Connect to http2 server
    pub async fn connect(&self, address: A) -> Result<SimpleClient, ClientError> {
        let scheme = self.scheme.clone();
        let authority = ByteString::from(address.host());

        let fut = async {
            Ok::<_, ClientError>(SimpleClient::new(
                self.connector.call(Connect::new(address)).await?,
                self.config.clone(),
                scheme,
                authority,
            ))
        };

        timeout_checked(self.config.0.handshake_timeout.get(), fut)
            .await
            .map_err(|_| ClientError::HandshakeTimeout)
            .and_then(|item| item)
    }
}