ntex_h2/client/
simple.rs

1use std::{fmt, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll};
2
3use nanorand::Rng;
4use ntex_bytes::ByteString;
5use ntex_http::{HeaderMap, Method, uri::Scheme};
6use ntex_io::{Dispatcher as IoDispatcher, IoBoxed, IoRef, OnDisconnect};
7use ntex_service::cfg::Cfg;
8use ntex_util::{channel::pool, time::Millis, time::Sleep};
9
10use crate::connection::Connection;
11use crate::default::DefaultControlService;
12use crate::dispatcher::Dispatcher;
13use crate::{OperationError, ServiceConfig, codec::Codec};
14
15use super::stream::{HandleService, InflightStorage, RecvStream, SendStream};
16
17/// Http2 client
18#[derive(Clone)]
19pub struct SimpleClient(Rc<ClientRef>);
20
21/// Http2 client
22struct ClientRef {
23    id: ByteString,
24    con: Connection,
25    authority: ByteString,
26    storage: InflightStorage,
27}
28
29impl SimpleClient {
30    /// Construct new `Client` instance.
31    pub fn new<T>(io: T, scheme: Scheme, authority: ByteString) -> Self
32    where
33        IoBoxed: From<T>,
34    {
35        let io: IoBoxed = io.into();
36        let cfg = io.shared().get();
37        SimpleClient::with_params(
38            io,
39            cfg,
40            scheme,
41            authority,
42            false,
43            InflightStorage::default(),
44            pool::new(),
45        )
46    }
47
48    pub(super) fn with_params(
49        io: IoBoxed,
50        cfg: Cfg<ServiceConfig>,
51        scheme: Scheme,
52        authority: ByteString,
53        skip_unknown_streams: bool,
54        storage: InflightStorage,
55        pool: pool::Pool<()>,
56    ) -> Self {
57        let codec = Codec::default();
58        let con = Connection::new(
59            false,
60            io.get_ref(),
61            codec,
62            cfg,
63            false,
64            skip_unknown_streams,
65            pool,
66        );
67        con.set_secure(scheme == Scheme::HTTPS);
68
69        let disp = Dispatcher::new(
70            con.clone(),
71            DefaultControlService,
72            HandleService::new(storage.clone()),
73        );
74
75        let fut = IoDispatcher::new(io, con.codec().clone(), disp);
76        let _ = ntex_util::spawn(async move {
77            let _ = fut.await;
78        });
79
80        SimpleClient(Rc::new(ClientRef {
81            con,
82            authority,
83            storage,
84            id: gen_id(),
85        }))
86    }
87
88    #[inline]
89    /// Get client id
90    pub fn id(&self) -> &ByteString {
91        &self.0.id
92    }
93
94    #[inline]
95    /// Get io tag
96    pub fn tag(&self) -> &'static str {
97        self.0.con.tag()
98    }
99
100    #[inline]
101    /// Send request to the peer
102    pub async fn send(
103        &self,
104        method: Method,
105        path: ByteString,
106        headers: HeaderMap,
107        eof: bool,
108    ) -> Result<(SendStream, RecvStream), OperationError> {
109        let stream = self
110            .0
111            .con
112            .send_request(self.0.authority.clone(), method, path, headers, eof)
113            .await?;
114
115        Ok(self.0.storage.inflight(stream))
116    }
117
118    #[inline]
119    /// Check if client is allowed to send new request
120    ///
121    /// Readiness depends on number of opened streams and max concurrency setting
122    pub fn is_ready(&self) -> bool {
123        self.0.con.can_create_new_stream()
124    }
125
126    #[inline]
127    /// Check client readiness
128    ///
129    /// Client is ready when it is possible to start new stream
130    pub async fn ready(&self) -> Result<(), OperationError> {
131        self.0.con.ready().await
132    }
133
134    #[inline]
135    /// Gracefully close connection
136    pub fn close(&self) {
137        log::debug!("Closing client");
138        self.0.con.disconnect_when_ready()
139    }
140
141    #[inline]
142    /// Close connection
143    pub fn force_close(&self) {
144        self.0.con.close()
145    }
146
147    #[inline]
148    /// Gracefully disconnect connection
149    ///
150    /// Connection force closes if `ClientDisconnect` get dropped
151    pub fn disconnect(&self) -> ClientDisconnect {
152        ClientDisconnect::new(self.clone())
153    }
154
155    #[inline]
156    /// Check if connection is closed
157    pub fn is_closed(&self) -> bool {
158        self.0.con.is_closed()
159    }
160
161    #[inline]
162    /// Check if connection is disconnecting
163    pub fn is_disconnecting(&self) -> bool {
164        self.0.con.is_disconnecting()
165    }
166
167    #[inline]
168    /// Notify when connection get closed
169    pub fn on_disconnect(&self) -> OnDisconnect {
170        self.0.con.io().on_disconnect()
171    }
172
173    #[inline]
174    /// Client's authority
175    pub fn authority(&self) -> &ByteString {
176        &self.0.authority
177    }
178
179    /// Get max number of active streams
180    pub fn max_streams(&self) -> Option<u32> {
181        self.0.con.max_streams()
182    }
183
184    /// Get number of active streams
185    pub fn active_streams(&self) -> u32 {
186        self.0.con.active_streams()
187    }
188
189    #[doc(hidden)]
190    /// Get number of active streams
191    pub fn pings_count(&self) -> u16 {
192        self.0.con.pings_count()
193    }
194
195    #[doc(hidden)]
196    /// Get access to underlining io object
197    pub fn io_ref(&self) -> &IoRef {
198        self.0.con.io()
199    }
200
201    #[doc(hidden)]
202    /// Get access to underlining http/2 connection object
203    pub fn connection(&self) -> &Connection {
204        &self.0.con
205    }
206}
207
208impl Drop for SimpleClient {
209    fn drop(&mut self) {
210        if Rc::strong_count(&self.0) == 1 {
211            log::debug!("Last h2 client has been dropped, disconnecting");
212            self.0.con.disconnect_when_ready()
213        }
214    }
215}
216
217impl fmt::Debug for SimpleClient {
218    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219        f.debug_struct("ntex_h2::SimpleClient")
220            .field("authority", &self.0.authority)
221            .field("connection", &self.0.con)
222            .finish()
223    }
224}
225
226#[derive(Debug)]
227pub struct ClientDisconnect {
228    client: SimpleClient,
229    disconnect: OnDisconnect,
230    timeout: Option<Sleep>,
231}
232
233impl ClientDisconnect {
234    fn new(client: SimpleClient) -> Self {
235        log::debug!("Disconnecting client");
236
237        client.0.con.disconnect_when_ready();
238        ClientDisconnect {
239            disconnect: client.on_disconnect(),
240            timeout: None,
241            client,
242        }
243    }
244
245    pub fn disconnect_timeout<T>(mut self, timeout: T) -> Self
246    where
247        Millis: From<T>,
248    {
249        self.timeout = Some(Sleep::new(timeout.into()));
250        self
251    }
252}
253
254impl Drop for ClientDisconnect {
255    fn drop(&mut self) {
256        self.client.0.con.close();
257    }
258}
259
260impl Future for ClientDisconnect {
261    type Output = Result<(), OperationError>;
262
263    #[inline]
264    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
265        let mut this = self.as_mut();
266
267        if Pin::new(&mut this.disconnect).poll(cx).is_ready() {
268            return Poll::Ready(this.client.0.con.check_error_with_disconnect());
269        } else if let Some(ref mut sleep) = this.timeout {
270            if sleep.poll_elapsed(cx).is_ready() {
271                this.client.0.con.close();
272                return Poll::Ready(Err(OperationError::Disconnected));
273            }
274        }
275        Poll::Pending
276    }
277}
278
279fn gen_id() -> ByteString {
280    const BASE: &str = "abcdefghijklmnopqrstuvwxyz234567";
281
282    let mut rng = nanorand::tls_rng();
283    let mut id = String::with_capacity(16);
284    for _ in 0..16 {
285        let idx = rng.generate_range::<usize, _>(..BASE.len());
286        id.push_str(&BASE[idx..idx + 1]);
287    }
288    ByteString::from(id)
289}