1use std::{fmt, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll};
2
3use nanorand::Rng;
4use ntex_bytes::ByteString;
5use ntex_http::{uri::Scheme, HeaderMap, Method};
6use ntex_io::{Dispatcher as IoDispatcher, IoBoxed, IoRef, OnDisconnect};
7use ntex_util::time::{Millis, Sleep};
8
9use crate::connection::Connection;
10use crate::default::DefaultControlService;
11use crate::dispatcher::Dispatcher;
12use crate::{codec::Codec, config::Config, OperationError};
13
14use super::stream::{HandleService, InflightStorage, RecvStream, SendStream};
15
16#[derive(Clone)]
18pub struct SimpleClient(Rc<ClientRef>);
19
20struct ClientRef {
22 id: ByteString,
23 con: Connection,
24 authority: ByteString,
25 storage: InflightStorage,
26}
27
28impl SimpleClient {
29 pub fn new<T>(io: T, config: Config, scheme: Scheme, authority: ByteString) -> Self
31 where
32 IoBoxed: From<T>,
33 {
34 SimpleClient::with_params(
35 io.into(),
36 config,
37 scheme,
38 authority,
39 InflightStorage::default(),
40 )
41 }
42
43 pub(super) fn with_params(
44 io: IoBoxed,
45 config: Config,
46 scheme: Scheme,
47 authority: ByteString,
48 storage: InflightStorage,
49 ) -> Self {
50 let codec = Codec::default();
51 let con = Connection::new(io.get_ref(), codec, config, false);
52 con.set_secure(scheme == Scheme::HTTPS);
53
54 let disp = Dispatcher::new(
55 con.clone(),
56 DefaultControlService,
57 HandleService::new(storage.clone()),
58 );
59
60 let fut = IoDispatcher::new(
61 io,
62 con.codec().clone(),
63 disp,
64 &con.config().dispatcher_config,
65 );
66 let _ = ntex_util::spawn(async move {
67 let _ = fut.await;
68 });
69
70 SimpleClient(Rc::new(ClientRef {
71 con,
72 authority,
73 storage,
74 id: gen_id(),
75 }))
76 }
77
78 #[inline]
79 pub fn id(&self) -> &ByteString {
81 &self.0.id
82 }
83
84 #[inline]
85 pub fn tag(&self) -> &'static str {
87 self.0.con.tag()
88 }
89
90 #[inline]
91 pub async fn send(
93 &self,
94 method: Method,
95 path: ByteString,
96 headers: HeaderMap,
97 eof: bool,
98 ) -> Result<(SendStream, RecvStream), OperationError> {
99 let stream = self
100 .0
101 .con
102 .send_request(self.0.authority.clone(), method, path, headers, eof)
103 .await?;
104
105 Ok(self.0.storage.inflight(stream))
106 }
107
108 #[inline]
109 pub fn is_ready(&self) -> bool {
113 self.0.con.can_create_new_stream()
114 }
115
116 #[inline]
117 pub async fn ready(&self) -> Result<(), OperationError> {
121 self.0.con.ready().await
122 }
123
124 #[inline]
125 pub fn close(&self) {
127 log::debug!("Closing client");
128 self.0.con.disconnect_when_ready()
129 }
130
131 #[inline]
132 pub fn force_close(&self) {
134 self.0.con.close()
135 }
136
137 #[inline]
138 pub fn disconnect(&self) -> ClientDisconnect {
142 ClientDisconnect::new(self.clone())
143 }
144
145 #[inline]
146 pub fn is_closed(&self) -> bool {
148 self.0.con.is_closed()
149 }
150
151 #[inline]
152 pub fn is_disconnecting(&self) -> bool {
154 self.0.con.is_disconnecting()
155 }
156
157 #[inline]
158 pub fn on_disconnect(&self) -> OnDisconnect {
160 self.0.con.io().on_disconnect()
161 }
162
163 #[inline]
164 pub fn authority(&self) -> &ByteString {
166 &self.0.authority
167 }
168
169 pub fn max_streams(&self) -> Option<u32> {
171 self.0.con.max_streams()
172 }
173
174 pub fn active_streams(&self) -> u32 {
176 self.0.con.active_streams()
177 }
178
179 #[doc(hidden)]
180 pub fn pings_count(&self) -> u16 {
182 self.0.con.pings_count()
183 }
184
185 #[doc(hidden)]
186 pub fn io_ref(&self) -> &IoRef {
188 self.0.con.io()
189 }
190
191 #[doc(hidden)]
192 pub fn connection(&self) -> &Connection {
194 &self.0.con
195 }
196}
197
198impl Drop for SimpleClient {
199 fn drop(&mut self) {
200 if Rc::strong_count(&self.0) == 1 {
201 log::debug!("Last h2 client has been dropped, disconnecting");
202 self.0.con.disconnect_when_ready()
203 }
204 }
205}
206
207impl fmt::Debug for SimpleClient {
208 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209 f.debug_struct("ntex_h2::SimpleClient")
210 .field("authority", &self.0.authority)
211 .field("connection", &self.0.con)
212 .finish()
213 }
214}
215
216#[derive(Debug)]
217pub struct ClientDisconnect {
218 client: SimpleClient,
219 disconnect: OnDisconnect,
220 timeout: Option<Sleep>,
221}
222
223impl ClientDisconnect {
224 fn new(client: SimpleClient) -> Self {
225 log::debug!("Disconnecting client");
226
227 client.0.con.disconnect_when_ready();
228 ClientDisconnect {
229 disconnect: client.on_disconnect(),
230 timeout: None,
231 client,
232 }
233 }
234
235 pub fn disconnect_timeout<T>(mut self, timeout: T) -> Self
236 where
237 Millis: From<T>,
238 {
239 self.timeout = Some(Sleep::new(timeout.into()));
240 self
241 }
242}
243
244impl Drop for ClientDisconnect {
245 fn drop(&mut self) {
246 self.client.0.con.close();
247 }
248}
249
250impl Future for ClientDisconnect {
251 type Output = Result<(), OperationError>;
252
253 #[inline]
254 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
255 let mut this = self.as_mut();
256
257 if Pin::new(&mut this.disconnect).poll(cx).is_ready() {
258 return Poll::Ready(this.client.0.con.check_error_with_disconnect());
259 } else if let Some(ref mut sleep) = this.timeout {
260 if sleep.poll_elapsed(cx).is_ready() {
261 this.client.0.con.close();
262 return Poll::Ready(Err(OperationError::Disconnected));
263 }
264 }
265 Poll::Pending
266 }
267}
268
269fn gen_id() -> ByteString {
270 const BASE: &str = "abcdefghijklmnopqrstuvwxyz234567";
271
272 let mut rng = nanorand::tls_rng();
273 let mut id = String::with_capacity(16);
274 for _ in 0..16 {
275 let idx = rng.generate_range::<usize, _>(..BASE.len());
276 id.push_str(&BASE[idx..idx + 1]);
277 }
278 ByteString::from(id)
279}