ntex_h2/client/
simple.rs

1use std::{fmt, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll};
2
3use nanorand::Rng;
4use ntex_bytes::ByteString;
5use ntex_http::{uri::Scheme, HeaderMap, Method};
6use ntex_io::{Dispatcher as IoDispatcher, IoBoxed, IoRef, OnDisconnect};
7use ntex_util::time::{Millis, Sleep};
8
9use crate::connection::Connection;
10use crate::default::DefaultControlService;
11use crate::dispatcher::Dispatcher;
12use crate::{codec::Codec, config::Config, OperationError};
13
14use super::stream::{HandleService, InflightStorage, RecvStream, SendStream};
15
16/// Http2 client
17#[derive(Clone)]
18pub struct SimpleClient(Rc<ClientRef>);
19
20/// Http2 client
21struct ClientRef {
22    id: ByteString,
23    con: Connection,
24    authority: ByteString,
25    storage: InflightStorage,
26}
27
28impl SimpleClient {
29    /// Construct new `Client` instance.
30    pub fn new<T>(io: T, config: Config, scheme: Scheme, authority: ByteString) -> Self
31    where
32        IoBoxed: From<T>,
33    {
34        SimpleClient::with_params(
35            io.into(),
36            config,
37            scheme,
38            authority,
39            InflightStorage::default(),
40        )
41    }
42
43    pub(super) fn with_params(
44        io: IoBoxed,
45        config: Config,
46        scheme: Scheme,
47        authority: ByteString,
48        storage: InflightStorage,
49    ) -> Self {
50        let codec = Codec::default();
51        let con = Connection::new(io.get_ref(), codec, config, false);
52        con.set_secure(scheme == Scheme::HTTPS);
53
54        let disp = Dispatcher::new(
55            con.clone(),
56            DefaultControlService,
57            HandleService::new(storage.clone()),
58        );
59
60        let fut = IoDispatcher::new(
61            io,
62            con.codec().clone(),
63            disp,
64            &con.config().dispatcher_config,
65        );
66        let _ = ntex_util::spawn(async move {
67            let _ = fut.await;
68        });
69
70        SimpleClient(Rc::new(ClientRef {
71            con,
72            authority,
73            storage,
74            id: gen_id(),
75        }))
76    }
77
78    #[inline]
79    /// Get client id
80    pub fn id(&self) -> &ByteString {
81        &self.0.id
82    }
83
84    #[inline]
85    /// Get io tag
86    pub fn tag(&self) -> &'static str {
87        self.0.con.tag()
88    }
89
90    #[inline]
91    /// Send request to the peer
92    pub async fn send(
93        &self,
94        method: Method,
95        path: ByteString,
96        headers: HeaderMap,
97        eof: bool,
98    ) -> Result<(SendStream, RecvStream), OperationError> {
99        let stream = self
100            .0
101            .con
102            .send_request(self.0.authority.clone(), method, path, headers, eof)
103            .await?;
104
105        Ok(self.0.storage.inflight(stream))
106    }
107
108    #[inline]
109    /// Check if client is allowed to send new request
110    ///
111    /// Readiness depends on number of opened streams and max concurrency setting
112    pub fn is_ready(&self) -> bool {
113        self.0.con.can_create_new_stream()
114    }
115
116    #[inline]
117    /// Check client readiness
118    ///
119    /// Client is ready when it is possible to start new stream
120    pub async fn ready(&self) -> Result<(), OperationError> {
121        self.0.con.ready().await
122    }
123
124    #[inline]
125    /// Gracefully close connection
126    pub fn close(&self) {
127        log::debug!("Closing client");
128        self.0.con.disconnect_when_ready()
129    }
130
131    #[inline]
132    /// Close connection
133    pub fn force_close(&self) {
134        self.0.con.close()
135    }
136
137    #[inline]
138    /// Gracefully disconnect connection
139    ///
140    /// Connection force closes if `ClientDisconnect` get dropped
141    pub fn disconnect(&self) -> ClientDisconnect {
142        ClientDisconnect::new(self.clone())
143    }
144
145    #[inline]
146    /// Check if connection is closed
147    pub fn is_closed(&self) -> bool {
148        self.0.con.is_closed()
149    }
150
151    #[inline]
152    /// Check if connection is disconnecting
153    pub fn is_disconnecting(&self) -> bool {
154        self.0.con.is_disconnecting()
155    }
156
157    #[inline]
158    /// Notify when connection get closed
159    pub fn on_disconnect(&self) -> OnDisconnect {
160        self.0.con.io().on_disconnect()
161    }
162
163    #[inline]
164    /// Client's authority
165    pub fn authority(&self) -> &ByteString {
166        &self.0.authority
167    }
168
169    /// Get max number of active streams
170    pub fn max_streams(&self) -> Option<u32> {
171        self.0.con.max_streams()
172    }
173
174    /// Get number of active streams
175    pub fn active_streams(&self) -> u32 {
176        self.0.con.active_streams()
177    }
178
179    #[doc(hidden)]
180    /// Get number of active streams
181    pub fn pings_count(&self) -> u16 {
182        self.0.con.pings_count()
183    }
184
185    #[doc(hidden)]
186    /// Get access to underlining io object
187    pub fn io_ref(&self) -> &IoRef {
188        self.0.con.io()
189    }
190
191    #[doc(hidden)]
192    /// Get access to underlining http/2 connection object
193    pub fn connection(&self) -> &Connection {
194        &self.0.con
195    }
196}
197
198impl Drop for SimpleClient {
199    fn drop(&mut self) {
200        if Rc::strong_count(&self.0) == 1 {
201            log::debug!("Last h2 client has been dropped, disconnecting");
202            self.0.con.disconnect_when_ready()
203        }
204    }
205}
206
207impl fmt::Debug for SimpleClient {
208    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209        f.debug_struct("ntex_h2::SimpleClient")
210            .field("authority", &self.0.authority)
211            .field("connection", &self.0.con)
212            .finish()
213    }
214}
215
216#[derive(Debug)]
217pub struct ClientDisconnect {
218    client: SimpleClient,
219    disconnect: OnDisconnect,
220    timeout: Option<Sleep>,
221}
222
223impl ClientDisconnect {
224    fn new(client: SimpleClient) -> Self {
225        log::debug!("Disconnecting client");
226
227        client.0.con.disconnect_when_ready();
228        ClientDisconnect {
229            disconnect: client.on_disconnect(),
230            timeout: None,
231            client,
232        }
233    }
234
235    pub fn disconnect_timeout<T>(mut self, timeout: T) -> Self
236    where
237        Millis: From<T>,
238    {
239        self.timeout = Some(Sleep::new(timeout.into()));
240        self
241    }
242}
243
244impl Drop for ClientDisconnect {
245    fn drop(&mut self) {
246        self.client.0.con.close();
247    }
248}
249
250impl Future for ClientDisconnect {
251    type Output = Result<(), OperationError>;
252
253    #[inline]
254    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
255        let mut this = self.as_mut();
256
257        if Pin::new(&mut this.disconnect).poll(cx).is_ready() {
258            return Poll::Ready(this.client.0.con.check_error_with_disconnect());
259        } else if let Some(ref mut sleep) = this.timeout {
260            if sleep.poll_elapsed(cx).is_ready() {
261                this.client.0.con.close();
262                return Poll::Ready(Err(OperationError::Disconnected));
263            }
264        }
265        Poll::Pending
266    }
267}
268
269fn gen_id() -> ByteString {
270    const BASE: &str = "abcdefghijklmnopqrstuvwxyz234567";
271
272    let mut rng = nanorand::tls_rng();
273    let mut id = String::with_capacity(16);
274    for _ in 0..16 {
275        let idx = rng.generate_range::<usize, _>(..BASE.len());
276        id.push_str(&BASE[idx..idx + 1]);
277    }
278    ByteString::from(id)
279}