1use std::{fmt, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll};
2
3use nanorand::Rng;
4use ntex_bytes::ByteString;
5use ntex_http::{HeaderMap, Method, uri::Scheme};
6use ntex_io::{Dispatcher as IoDispatcher, IoBoxed, IoRef, OnDisconnect};
7use ntex_service::cfg::Cfg;
8use ntex_util::{channel::pool, time::Millis, time::Sleep};
9
10use crate::connection::Connection;
11use crate::default::DefaultControlService;
12use crate::dispatcher::Dispatcher;
13use crate::{OperationError, ServiceConfig, codec::Codec};
14
15use super::stream::{HandleService, InflightStorage, RecvStream, SendStream};
16
17#[derive(Clone)]
19pub struct SimpleClient(Rc<ClientRef>);
20
21struct ClientRef {
23 id: ByteString,
24 con: Connection,
25 authority: ByteString,
26 storage: InflightStorage,
27}
28
29impl SimpleClient {
30 pub fn new<T>(io: T, scheme: Scheme, authority: ByteString) -> Self
32 where
33 IoBoxed: From<T>,
34 {
35 let io: IoBoxed = io.into();
36 let cfg = io.shared().get();
37 SimpleClient::with_params(
38 io,
39 cfg,
40 scheme,
41 authority,
42 false,
43 InflightStorage::default(),
44 pool::new(),
45 )
46 }
47
48 pub(super) fn with_params(
49 io: IoBoxed,
50 cfg: Cfg<ServiceConfig>,
51 scheme: Scheme,
52 authority: ByteString,
53 skip_unknown_streams: bool,
54 storage: InflightStorage,
55 pool: pool::Pool<()>,
56 ) -> Self {
57 let codec = Codec::default();
58 let con = Connection::new(
59 false,
60 io.get_ref(),
61 codec,
62 cfg,
63 false,
64 skip_unknown_streams,
65 pool,
66 );
67 con.set_secure(scheme == Scheme::HTTPS);
68
69 let disp = Dispatcher::new(
70 con.clone(),
71 DefaultControlService,
72 HandleService::new(storage.clone()),
73 );
74
75 let fut = IoDispatcher::new(io, con.codec().clone(), disp);
76 let _ = ntex_util::spawn(async move {
77 let _ = fut.await;
78 });
79
80 SimpleClient(Rc::new(ClientRef {
81 con,
82 authority,
83 storage,
84 id: gen_id(),
85 }))
86 }
87
88 #[inline]
89 pub fn id(&self) -> &ByteString {
91 &self.0.id
92 }
93
94 #[inline]
95 pub fn tag(&self) -> &'static str {
97 self.0.con.tag()
98 }
99
100 #[inline]
101 pub async fn send(
103 &self,
104 method: Method,
105 path: ByteString,
106 headers: HeaderMap,
107 eof: bool,
108 ) -> Result<(SendStream, RecvStream), OperationError> {
109 let stream = self
110 .0
111 .con
112 .send_request(self.0.authority.clone(), method, path, headers, eof)
113 .await?;
114
115 Ok(self.0.storage.inflight(stream))
116 }
117
118 #[inline]
119 pub fn is_ready(&self) -> bool {
123 self.0.con.can_create_new_stream()
124 }
125
126 #[inline]
127 pub async fn ready(&self) -> Result<(), OperationError> {
131 self.0.con.ready().await
132 }
133
134 #[inline]
135 pub fn close(&self) {
137 log::debug!("Closing client");
138 self.0.con.disconnect_when_ready()
139 }
140
141 #[inline]
142 pub fn force_close(&self) {
144 self.0.con.close()
145 }
146
147 #[inline]
148 pub fn disconnect(&self) -> ClientDisconnect {
152 ClientDisconnect::new(self.clone())
153 }
154
155 #[inline]
156 pub fn is_closed(&self) -> bool {
158 self.0.con.is_closed()
159 }
160
161 #[inline]
162 pub fn is_disconnecting(&self) -> bool {
164 self.0.con.is_disconnecting()
165 }
166
167 #[inline]
168 pub fn on_disconnect(&self) -> OnDisconnect {
170 self.0.con.io().on_disconnect()
171 }
172
173 #[inline]
174 pub fn authority(&self) -> &ByteString {
176 &self.0.authority
177 }
178
179 pub fn max_streams(&self) -> Option<u32> {
181 self.0.con.max_streams()
182 }
183
184 pub fn active_streams(&self) -> u32 {
186 self.0.con.active_streams()
187 }
188
189 #[doc(hidden)]
190 pub fn pings_count(&self) -> u16 {
192 self.0.con.pings_count()
193 }
194
195 #[doc(hidden)]
196 pub fn io_ref(&self) -> &IoRef {
198 self.0.con.io()
199 }
200
201 #[doc(hidden)]
202 pub fn connection(&self) -> &Connection {
204 &self.0.con
205 }
206}
207
208impl Drop for SimpleClient {
209 fn drop(&mut self) {
210 if Rc::strong_count(&self.0) == 1 {
211 log::debug!("Last h2 client has been dropped, disconnecting");
212 self.0.con.disconnect_when_ready()
213 }
214 }
215}
216
217impl fmt::Debug for SimpleClient {
218 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219 f.debug_struct("ntex_h2::SimpleClient")
220 .field("authority", &self.0.authority)
221 .field("connection", &self.0.con)
222 .finish()
223 }
224}
225
226#[derive(Debug)]
227pub struct ClientDisconnect {
228 client: SimpleClient,
229 disconnect: OnDisconnect,
230 timeout: Option<Sleep>,
231}
232
233impl ClientDisconnect {
234 fn new(client: SimpleClient) -> Self {
235 log::debug!("Disconnecting client");
236
237 client.0.con.disconnect_when_ready();
238 ClientDisconnect {
239 disconnect: client.on_disconnect(),
240 timeout: None,
241 client,
242 }
243 }
244
245 pub fn disconnect_timeout<T>(mut self, timeout: T) -> Self
246 where
247 Millis: From<T>,
248 {
249 self.timeout = Some(Sleep::new(timeout.into()));
250 self
251 }
252}
253
254impl Drop for ClientDisconnect {
255 fn drop(&mut self) {
256 self.client.0.con.close();
257 }
258}
259
260impl Future for ClientDisconnect {
261 type Output = Result<(), OperationError>;
262
263 #[inline]
264 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
265 let mut this = self.as_mut();
266
267 if Pin::new(&mut this.disconnect).poll(cx).is_ready() {
268 return Poll::Ready(this.client.0.con.check_error_with_disconnect());
269 } else if let Some(ref mut sleep) = this.timeout {
270 if sleep.poll_elapsed(cx).is_ready() {
271 this.client.0.con.close();
272 return Poll::Ready(Err(OperationError::Disconnected));
273 }
274 }
275 Poll::Pending
276 }
277}
278
279fn gen_id() -> ByteString {
280 const BASE: &str = "abcdefghijklmnopqrstuvwxyz234567";
281
282 let mut rng = nanorand::tls_rng();
283 let mut id = String::with_capacity(16);
284 for _ in 0..16 {
285 let idx = rng.generate_range::<usize, _>(..BASE.len());
286 id.push_str(&BASE[idx..idx + 1]);
287 }
288 ByteString::from(id)
289}