ntex_h2/client/
simple.rs

1use std::{fmt, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll};
2
3use nanorand::Rng;
4use ntex_bytes::ByteString;
5use ntex_http::{uri::Scheme, HeaderMap, Method};
6use ntex_io::{Dispatcher as IoDispatcher, IoBoxed, IoRef, OnDisconnect};
7use ntex_util::time::{Millis, Sleep};
8
9use crate::connection::Connection;
10use crate::default::DefaultControlService;
11use crate::dispatcher::Dispatcher;
12use crate::{codec::Codec, config::Config, OperationError};
13
14use super::stream::{HandleService, InflightStorage, RecvStream, SendStream};
15
16/// Http2 client
17#[derive(Clone)]
18pub struct SimpleClient(Rc<ClientRef>);
19
20/// Http2 client
21struct ClientRef {
22    id: ByteString,
23    con: Connection,
24    authority: ByteString,
25    storage: InflightStorage,
26}
27
28impl SimpleClient {
29    /// Construct new `Client` instance.
30    pub fn new<T>(io: T, config: Config, scheme: Scheme, authority: ByteString) -> Self
31    where
32        IoBoxed: From<T>,
33    {
34        SimpleClient::with_params(
35            io.into(),
36            config,
37            scheme,
38            authority,
39            false,
40            InflightStorage::default(),
41        )
42    }
43
44    pub(super) fn with_params(
45        io: IoBoxed,
46        config: Config,
47        scheme: Scheme,
48        authority: ByteString,
49        skip_unknown_streams: bool,
50        storage: InflightStorage,
51    ) -> Self {
52        let codec = Codec::default();
53        let con = Connection::new(io.get_ref(), codec, config, false, skip_unknown_streams);
54        con.set_secure(scheme == Scheme::HTTPS);
55
56        let disp = Dispatcher::new(
57            con.clone(),
58            DefaultControlService,
59            HandleService::new(storage.clone()),
60        );
61
62        let fut = IoDispatcher::new(
63            io,
64            con.codec().clone(),
65            disp,
66            &con.config().dispatcher_config,
67        );
68        let _ = ntex_util::spawn(async move {
69            let _ = fut.await;
70        });
71
72        SimpleClient(Rc::new(ClientRef {
73            con,
74            authority,
75            storage,
76            id: gen_id(),
77        }))
78    }
79
80    #[inline]
81    /// Get client id
82    pub fn id(&self) -> &ByteString {
83        &self.0.id
84    }
85
86    #[inline]
87    /// Get io tag
88    pub fn tag(&self) -> &'static str {
89        self.0.con.tag()
90    }
91
92    #[inline]
93    /// Send request to the peer
94    pub async fn send(
95        &self,
96        method: Method,
97        path: ByteString,
98        headers: HeaderMap,
99        eof: bool,
100    ) -> Result<(SendStream, RecvStream), OperationError> {
101        let stream = self
102            .0
103            .con
104            .send_request(self.0.authority.clone(), method, path, headers, eof)
105            .await?;
106
107        Ok(self.0.storage.inflight(stream))
108    }
109
110    #[inline]
111    /// Check if client is allowed to send new request
112    ///
113    /// Readiness depends on number of opened streams and max concurrency setting
114    pub fn is_ready(&self) -> bool {
115        self.0.con.can_create_new_stream()
116    }
117
118    #[inline]
119    /// Check client readiness
120    ///
121    /// Client is ready when it is possible to start new stream
122    pub async fn ready(&self) -> Result<(), OperationError> {
123        self.0.con.ready().await
124    }
125
126    #[inline]
127    /// Gracefully close connection
128    pub fn close(&self) {
129        log::debug!("Closing client");
130        self.0.con.disconnect_when_ready()
131    }
132
133    #[inline]
134    /// Close connection
135    pub fn force_close(&self) {
136        self.0.con.close()
137    }
138
139    #[inline]
140    /// Gracefully disconnect connection
141    ///
142    /// Connection force closes if `ClientDisconnect` get dropped
143    pub fn disconnect(&self) -> ClientDisconnect {
144        ClientDisconnect::new(self.clone())
145    }
146
147    #[inline]
148    /// Check if connection is closed
149    pub fn is_closed(&self) -> bool {
150        self.0.con.is_closed()
151    }
152
153    #[inline]
154    /// Check if connection is disconnecting
155    pub fn is_disconnecting(&self) -> bool {
156        self.0.con.is_disconnecting()
157    }
158
159    #[inline]
160    /// Notify when connection get closed
161    pub fn on_disconnect(&self) -> OnDisconnect {
162        self.0.con.io().on_disconnect()
163    }
164
165    #[inline]
166    /// Client's authority
167    pub fn authority(&self) -> &ByteString {
168        &self.0.authority
169    }
170
171    /// Get max number of active streams
172    pub fn max_streams(&self) -> Option<u32> {
173        self.0.con.max_streams()
174    }
175
176    /// Get number of active streams
177    pub fn active_streams(&self) -> u32 {
178        self.0.con.active_streams()
179    }
180
181    #[doc(hidden)]
182    /// Get number of active streams
183    pub fn pings_count(&self) -> u16 {
184        self.0.con.pings_count()
185    }
186
187    #[doc(hidden)]
188    /// Get access to underlining io object
189    pub fn io_ref(&self) -> &IoRef {
190        self.0.con.io()
191    }
192
193    #[doc(hidden)]
194    /// Get access to underlining http/2 connection object
195    pub fn connection(&self) -> &Connection {
196        &self.0.con
197    }
198}
199
200impl Drop for SimpleClient {
201    fn drop(&mut self) {
202        if Rc::strong_count(&self.0) == 1 {
203            log::debug!("Last h2 client has been dropped, disconnecting");
204            self.0.con.disconnect_when_ready()
205        }
206    }
207}
208
209impl fmt::Debug for SimpleClient {
210    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
211        f.debug_struct("ntex_h2::SimpleClient")
212            .field("authority", &self.0.authority)
213            .field("connection", &self.0.con)
214            .finish()
215    }
216}
217
218#[derive(Debug)]
219pub struct ClientDisconnect {
220    client: SimpleClient,
221    disconnect: OnDisconnect,
222    timeout: Option<Sleep>,
223}
224
225impl ClientDisconnect {
226    fn new(client: SimpleClient) -> Self {
227        log::debug!("Disconnecting client");
228
229        client.0.con.disconnect_when_ready();
230        ClientDisconnect {
231            disconnect: client.on_disconnect(),
232            timeout: None,
233            client,
234        }
235    }
236
237    pub fn disconnect_timeout<T>(mut self, timeout: T) -> Self
238    where
239        Millis: From<T>,
240    {
241        self.timeout = Some(Sleep::new(timeout.into()));
242        self
243    }
244}
245
246impl Drop for ClientDisconnect {
247    fn drop(&mut self) {
248        self.client.0.con.close();
249    }
250}
251
252impl Future for ClientDisconnect {
253    type Output = Result<(), OperationError>;
254
255    #[inline]
256    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
257        let mut this = self.as_mut();
258
259        if Pin::new(&mut this.disconnect).poll(cx).is_ready() {
260            return Poll::Ready(this.client.0.con.check_error_with_disconnect());
261        } else if let Some(ref mut sleep) = this.timeout {
262            if sleep.poll_elapsed(cx).is_ready() {
263                this.client.0.con.close();
264                return Poll::Ready(Err(OperationError::Disconnected));
265            }
266        }
267        Poll::Pending
268    }
269}
270
271fn gen_id() -> ByteString {
272    const BASE: &str = "abcdefghijklmnopqrstuvwxyz234567";
273
274    let mut rng = nanorand::tls_rng();
275    let mut id = String::with_capacity(16);
276    for _ in 0..16 {
277        let idx = rng.generate_range::<usize, _>(..BASE.len());
278        id.push_str(&BASE[idx..idx + 1]);
279    }
280    ByteString::from(id)
281}