1use std::{fmt, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll};
2
3use nanorand::Rng;
4use ntex_bytes::ByteString;
5use ntex_http::{uri::Scheme, HeaderMap, Method};
6use ntex_io::{Dispatcher as IoDispatcher, IoBoxed, IoRef, OnDisconnect};
7use ntex_util::time::{Millis, Sleep};
8
9use crate::connection::Connection;
10use crate::default::DefaultControlService;
11use crate::dispatcher::Dispatcher;
12use crate::{codec::Codec, config::Config, OperationError};
13
14use super::stream::{HandleService, InflightStorage, RecvStream, SendStream};
15
16#[derive(Clone)]
18pub struct SimpleClient(Rc<ClientRef>);
19
20struct ClientRef {
22 id: ByteString,
23 con: Connection,
24 authority: ByteString,
25 storage: InflightStorage,
26}
27
28impl SimpleClient {
29 pub fn new<T>(io: T, config: Config, scheme: Scheme, authority: ByteString) -> Self
31 where
32 IoBoxed: From<T>,
33 {
34 SimpleClient::with_params(
35 io.into(),
36 config,
37 scheme,
38 authority,
39 false,
40 InflightStorage::default(),
41 )
42 }
43
44 pub(super) fn with_params(
45 io: IoBoxed,
46 config: Config,
47 scheme: Scheme,
48 authority: ByteString,
49 skip_unknown_streams: bool,
50 storage: InflightStorage,
51 ) -> Self {
52 let codec = Codec::default();
53 let con = Connection::new(io.get_ref(), codec, config, false, skip_unknown_streams);
54 con.set_secure(scheme == Scheme::HTTPS);
55
56 let disp = Dispatcher::new(
57 con.clone(),
58 DefaultControlService,
59 HandleService::new(storage.clone()),
60 );
61
62 let fut = IoDispatcher::new(
63 io,
64 con.codec().clone(),
65 disp,
66 &con.config().dispatcher_config,
67 );
68 let _ = ntex_util::spawn(async move {
69 let _ = fut.await;
70 });
71
72 SimpleClient(Rc::new(ClientRef {
73 con,
74 authority,
75 storage,
76 id: gen_id(),
77 }))
78 }
79
80 #[inline]
81 pub fn id(&self) -> &ByteString {
83 &self.0.id
84 }
85
86 #[inline]
87 pub fn tag(&self) -> &'static str {
89 self.0.con.tag()
90 }
91
92 #[inline]
93 pub async fn send(
95 &self,
96 method: Method,
97 path: ByteString,
98 headers: HeaderMap,
99 eof: bool,
100 ) -> Result<(SendStream, RecvStream), OperationError> {
101 let stream = self
102 .0
103 .con
104 .send_request(self.0.authority.clone(), method, path, headers, eof)
105 .await?;
106
107 Ok(self.0.storage.inflight(stream))
108 }
109
110 #[inline]
111 pub fn is_ready(&self) -> bool {
115 self.0.con.can_create_new_stream()
116 }
117
118 #[inline]
119 pub async fn ready(&self) -> Result<(), OperationError> {
123 self.0.con.ready().await
124 }
125
126 #[inline]
127 pub fn close(&self) {
129 log::debug!("Closing client");
130 self.0.con.disconnect_when_ready()
131 }
132
133 #[inline]
134 pub fn force_close(&self) {
136 self.0.con.close()
137 }
138
139 #[inline]
140 pub fn disconnect(&self) -> ClientDisconnect {
144 ClientDisconnect::new(self.clone())
145 }
146
147 #[inline]
148 pub fn is_closed(&self) -> bool {
150 self.0.con.is_closed()
151 }
152
153 #[inline]
154 pub fn is_disconnecting(&self) -> bool {
156 self.0.con.is_disconnecting()
157 }
158
159 #[inline]
160 pub fn on_disconnect(&self) -> OnDisconnect {
162 self.0.con.io().on_disconnect()
163 }
164
165 #[inline]
166 pub fn authority(&self) -> &ByteString {
168 &self.0.authority
169 }
170
171 pub fn max_streams(&self) -> Option<u32> {
173 self.0.con.max_streams()
174 }
175
176 pub fn active_streams(&self) -> u32 {
178 self.0.con.active_streams()
179 }
180
181 #[doc(hidden)]
182 pub fn pings_count(&self) -> u16 {
184 self.0.con.pings_count()
185 }
186
187 #[doc(hidden)]
188 pub fn io_ref(&self) -> &IoRef {
190 self.0.con.io()
191 }
192
193 #[doc(hidden)]
194 pub fn connection(&self) -> &Connection {
196 &self.0.con
197 }
198}
199
200impl Drop for SimpleClient {
201 fn drop(&mut self) {
202 if Rc::strong_count(&self.0) == 1 {
203 log::debug!("Last h2 client has been dropped, disconnecting");
204 self.0.con.disconnect_when_ready()
205 }
206 }
207}
208
209impl fmt::Debug for SimpleClient {
210 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
211 f.debug_struct("ntex_h2::SimpleClient")
212 .field("authority", &self.0.authority)
213 .field("connection", &self.0.con)
214 .finish()
215 }
216}
217
218#[derive(Debug)]
219pub struct ClientDisconnect {
220 client: SimpleClient,
221 disconnect: OnDisconnect,
222 timeout: Option<Sleep>,
223}
224
225impl ClientDisconnect {
226 fn new(client: SimpleClient) -> Self {
227 log::debug!("Disconnecting client");
228
229 client.0.con.disconnect_when_ready();
230 ClientDisconnect {
231 disconnect: client.on_disconnect(),
232 timeout: None,
233 client,
234 }
235 }
236
237 pub fn disconnect_timeout<T>(mut self, timeout: T) -> Self
238 where
239 Millis: From<T>,
240 {
241 self.timeout = Some(Sleep::new(timeout.into()));
242 self
243 }
244}
245
246impl Drop for ClientDisconnect {
247 fn drop(&mut self) {
248 self.client.0.con.close();
249 }
250}
251
252impl Future for ClientDisconnect {
253 type Output = Result<(), OperationError>;
254
255 #[inline]
256 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
257 let mut this = self.as_mut();
258
259 if Pin::new(&mut this.disconnect).poll(cx).is_ready() {
260 return Poll::Ready(this.client.0.con.check_error_with_disconnect());
261 } else if let Some(ref mut sleep) = this.timeout {
262 if sleep.poll_elapsed(cx).is_ready() {
263 this.client.0.con.close();
264 return Poll::Ready(Err(OperationError::Disconnected));
265 }
266 }
267 Poll::Pending
268 }
269}
270
271fn gen_id() -> ByteString {
272 const BASE: &str = "abcdefghijklmnopqrstuvwxyz234567";
273
274 let mut rng = nanorand::tls_rng();
275 let mut id = String::with_capacity(16);
276 for _ in 0..16 {
277 let idx = rng.generate_range::<usize, _>(..BASE.len());
278 id.push_str(&BASE[idx..idx + 1]);
279 }
280 ByteString::from(id)
281}