http_type/stream/impl.rs
1use crate::*;
2
3/// Implementation of `From` trait for converting `usize` address into `&Stream`.
4impl From<usize> for &'static Stream {
5 /// Converts a memory address into a reference to `Stream`.
6 ///
7 /// # Arguments
8 ///
9 /// - `usize` - The memory address of the `Stream` instance.
10 ///
11 /// # Returns
12 ///
13 /// - `&'static Stream` - A reference to the `Stream` at the given address.
14 ///
15 /// # Safety
16 ///
17 /// - The address is guaranteed to be a valid `Stream` instance
18 /// that was previously converted from a reference and is managed by the runtime.
19 #[inline(always)]
20 fn from(address: usize) -> &'static Stream {
21 unsafe { &*(address as *const Stream) }
22 }
23}
24
25/// Implementation of `From` trait for converting `usize` address into `&mut Stream`.
26impl<'a> From<usize> for &'a mut Stream {
27 /// Converts a memory address into a mutable reference to `Stream`.
28 ///
29 /// # Arguments
30 ///
31 /// - `usize` - The memory address of the `Stream` instance.
32 ///
33 /// # Returns
34 ///
35 /// - `&mut Stream` - A mutable reference to the `Stream` at the given address.
36 ///
37 /// # Safety
38 ///
39 /// - The address is guaranteed to be a valid `Stream` instance
40 /// that was previously converted from a reference and is managed by the runtime.
41 #[inline(always)]
42 fn from(address: usize) -> &'a mut Stream {
43 unsafe { &mut *(address as *mut Stream) }
44 }
45}
46
47/// Implementation of `From` trait for converting `&Stream` into `usize` address.
48impl From<&Stream> for usize {
49 /// Converts a reference to `Stream` into its memory address.
50 ///
51 /// # Arguments
52 ///
53 /// - `&Stream` - The reference to the `Stream` instance.
54 ///
55 /// # Returns
56 ///
57 /// - `usize` - The memory address of the `Stream` instance.
58 #[inline(always)]
59 fn from(stream: &Stream) -> Self {
60 stream as *const Stream as usize
61 }
62}
63
64/// Implementation of `From` trait for converting `&mut Stream` into `usize` address.
65impl From<&mut Stream> for usize {
66 /// Converts a mutable reference to `Stream` into its memory address.
67 ///
68 /// # Arguments
69 ///
70 /// - `&mut Stream` - The mutable reference to the `Stream` instance.
71 ///
72 /// # Returns
73 ///
74 /// - `usize` - The memory address of the `Stream` instance.
75 #[inline(always)]
76 fn from(stream: &mut Stream) -> Self {
77 stream as *mut Stream as usize
78 }
79}
80
81/// Implementation of `AsRef` trait for `Stream`.
82impl AsRef<Stream> for Stream {
83 /// Converts `&Stream` to `&Stream` via memory address conversion.
84 ///
85 /// # Returns
86 ///
87 /// - `&Stream` - A reference to the `Stream` instance.
88 #[inline(always)]
89 fn as_ref(&self) -> &Self {
90 let address: usize = self.into();
91 address.into()
92 }
93}
94
95/// Implementation of `AsMut` trait for `Stream`.
96impl AsMut<Stream> for Stream {
97 /// Converts `&mut Stream` to `&mut Stream` via memory address conversion.
98 ///
99 /// # Returns
100 ///
101 /// - `&mut Stream` - A mutable reference to the `Stream` instance.
102 #[inline(always)]
103 fn as_mut(&mut self) -> &mut Self {
104 let address: usize = self.into();
105 address.into()
106 }
107}
108
109/// Implementation of `Lifetime` trait for `Stream`.
110impl Lifetime for Stream {
111 /// Converts a reference to the stream into a `'static` reference.
112 ///
113 /// # Returns
114 ///
115 /// - `&'static Self`: A reference to the stream with a `'static` lifetime.
116 ///
117 /// # Safety
118 ///
119 /// - The address is guaranteed to be a valid `Self` instance
120 /// that was previously converted from a reference and is managed by the runtime.
121 #[inline(always)]
122 unsafe fn leak(&self) -> &'static Self {
123 let address: usize = self.into();
124 address.into()
125 }
126
127 /// Converts a reference to the stream into a `'static` mutable reference.
128 ///
129 /// # Returns
130 ///
131 /// - `&'static mut Self`: A mutable reference to the stream with a `'static` lifetime.
132 ///
133 /// # Safety
134 ///
135 /// - The address is guaranteed to be a valid `Self` instance
136 /// that was previously converted from a reference and is managed by the runtime.
137 #[inline(always)]
138 unsafe fn leak_mut(&self) -> &'static mut Self {
139 let address: usize = self.into();
140 address.into()
141 }
142}
143
144impl Stream {
145 /// Checks if the connection should be kept alive.
146 ///
147 /// This method evaluates whether the connection should remain open based on
148 /// the closed state and the keep_alive parameter.
149 ///
150 /// # Arguments
151 ///
152 /// - `bool` - Whether keep-alive is enabled for the request.
153 ///
154 /// # Returns
155 ///
156 /// - `bool` - True if the connection should be kept alive, otherwise false.
157 #[inline(always)]
158 pub fn is_keep_alive(&self, keep_alive: bool) -> bool {
159 !self.get_closed() && keep_alive
160 }
161
162 /// Free the context.
163 ///
164 /// # Safety
165 ///
166 /// - The address is guaranteed to be a valid `Self` instance
167 /// that was previously converted from a reference and is managed by the runtime.
168 #[inline(always)]
169 pub unsafe fn free(&mut self) {
170 let _ = unsafe { Box::from_raw(self) };
171 }
172
173 /// Parses the HTTP request content from the stream.
174 ///
175 /// This is an internal helper function that performs the actual parsing.
176 ///
177 /// # Returns
178 ///
179 /// - `Result<Request, RequestError>`: The parsed request or an error.
180 async fn get_http_from_stream(&mut self) -> Result<Request, RequestError> {
181 let config: RequestConfig = *self.get_request_config();
182 let buffer_size: usize = config.get_buffer_size();
183 let max_path_size: usize = config.get_max_path_size();
184 let reader: &mut BufReader<&mut TcpStream> =
185 &mut BufReader::with_capacity(buffer_size, self.get_mut_stream());
186 let mut line: String = String::with_capacity(buffer_size);
187 AsyncBufReadExt::read_line(reader, &mut line).await?;
188 let (method, path, version): (RequestMethod, &str, RequestVersion) =
189 Request::get_http_first_line(&line)?;
190 Request::check_http_path_size(path, max_path_size)?;
191 let hash_index: Option<usize> = path.find(HASH);
192 let query_index: Option<usize> = path.find(QUERY);
193 let query: &str = Request::get_http_query(path, query_index, hash_index);
194 let querys: RequestQuerys = Request::get_http_querys(query);
195 let path: RequestPath = Request::get_http_path(path, query_index, hash_index);
196 let (headers, host, content_size): (RequestHeaders, RequestHost, usize) =
197 Request::get_http_headers(reader, &config).await?;
198 let body: RequestBody = Request::get_http_body(reader, content_size).await?;
199 Ok(Request {
200 method,
201 host,
202 version,
203 path,
204 querys,
205 headers,
206 body,
207 })
208 }
209
210 /// Parses an HTTP request from a TCP stream.
211 ///
212 /// Wraps the stream in a buffered reader and delegates to `http_from_reader`.
213 /// If the timeout is DEFAULT_LOW_SECURITY_READ_TIMEOUT_MS, no timeout is applied.
214 ///
215 /// # Returns
216 ///
217 /// - `Result<Request, RequestError>` - The parsed request or an error.
218 pub async fn try_get_http_request(&mut self) -> Result<Request, RequestError> {
219 if self.get_closed() {
220 return Err(RequestError::ServerClosedConnection(HttpStatus::BadRequest));
221 }
222 let timeout_ms: u64 = self.get_request_config().get_read_timeout_ms();
223 if timeout_ms == DEFAULT_LOW_SECURITY_READ_TIMEOUT_MS {
224 return self.get_http_from_stream().await;
225 }
226 let duration: Duration = Duration::from_millis(timeout_ms);
227 timeout(duration, self.get_http_from_stream()).await?
228 }
229
230 /// Parses a WebSocket request from a TCP stream.
231 ///
232 /// Wraps the stream in a buffered reader and delegates to `ws_from_reader`.
233 /// If the timeout is DEFAULT_LOW_SECURITY_READ_TIMEOUT_MS, no timeout is applied.
234 ///
235 /// # Returns
236 ///
237 /// - `Result<Request, RequestError>`: The parsed WebSocket request or an error.
238 pub async fn try_get_websocket_request(&mut self) -> Result<RequestBody, RequestError> {
239 if self.get_closed() {
240 return Err(RequestError::ServerClosedConnection(HttpStatus::BadRequest));
241 }
242 let config: RequestConfig = *self.get_request_config();
243 let buffer_size: usize = config.get_buffer_size();
244 let read_timeout_ms: u64 = config.get_read_timeout_ms();
245 let mut dynamic_buffer: Vec<u8> = Vec::with_capacity(buffer_size);
246 let mut temp_buffer: Vec<u8> = vec![0; buffer_size];
247 let mut full_frame: Vec<u8> = Vec::new();
248 let mut is_client_response: bool = false;
249 let duration_opt: Option<Duration> =
250 if read_timeout_ms == DEFAULT_LOW_SECURITY_READ_TIMEOUT_MS {
251 None
252 } else {
253 let adjusted_timeout_ms: u64 = (read_timeout_ms >> 1) + (read_timeout_ms & 1);
254 Some(Duration::from_millis(adjusted_timeout_ms))
255 };
256 loop {
257 let len: usize = match self
258 .get_websocket_from_stream(&mut temp_buffer, duration_opt, &mut is_client_response)
259 .await
260 {
261 Ok(Some(len)) => len,
262 Ok(None) => continue,
263 Err(error) => return Err(error),
264 };
265 if len == 0 {
266 return Err(RequestError::IncompleteWebSocketFrame(
267 HttpStatus::BadRequest,
268 ));
269 }
270 dynamic_buffer.extend_from_slice(&temp_buffer[..len]);
271 while let Some((frame, consumed)) = WebSocketFrame::decode_ws_frame(&dynamic_buffer) {
272 is_client_response = true;
273 dynamic_buffer.drain(0..consumed);
274 match frame.get_opcode() {
275 WebSocketOpcode::Close => {
276 return Err(RequestError::ClientClosedConnection(HttpStatus::BadRequest));
277 }
278 WebSocketOpcode::Ping | WebSocketOpcode::Pong => continue,
279 WebSocketOpcode::Text | WebSocketOpcode::Binary => {
280 match frame.build_full_frame(&mut full_frame) {
281 Ok(Some(result)) => return Ok(result),
282 Ok(None) => continue,
283 Err(error) => return Err(error),
284 }
285 }
286 _ => {
287 return Err(RequestError::WebSocketOpcodeUnsupported(
288 HttpStatus::NotImplemented,
289 ));
290 }
291 }
292 }
293 }
294 }
295
296 /// Reads data from the stream with optional timeout handling.
297 ///
298 /// # Arguments
299 ///
300 /// - `&mut [u8]`: The buffer to read data into.
301 /// - `Option<Duration>`: The optional timeout duration. If Some, timeout is applied; if None, no timeout.
302 /// - `&mut bool`: Mutable reference to track if we got a client response.
303 ///
304 /// # Returns
305 ///
306 /// - `Result<Option<usize>, RequestError>`: The number of bytes read, None for timeout/ping, or an error.
307 pub(crate) async fn get_websocket_from_stream(
308 &mut self,
309 buffer: &mut [u8],
310 duration_opt: Option<Duration>,
311 is_client_response: &mut bool,
312 ) -> Result<Option<usize>, RequestError> {
313 let stream: &mut TcpStream = self.get_mut_stream();
314 if let Some(duration) = duration_opt {
315 return match timeout(duration, stream.read(buffer)).await {
316 Ok(result) => match result {
317 Ok(len) => Ok(Some(len)),
318 Err(error) => Err(error.into()),
319 },
320 Err(error) => {
321 if !*is_client_response {
322 return Err(error.into());
323 }
324 *is_client_response = false;
325 self.try_send(&PING_FRAME).await?;
326 Ok(None)
327 }
328 };
329 }
330 match stream.read(buffer).await {
331 Ok(len) => Ok(Some(len)),
332 Err(error) => Err(error.into()),
333 }
334 }
335
336 /// Sends data over the stream.
337 ///
338 /// # Arguments
339 ///
340 /// - `AsRef<[u8]>` - The data to send (must implement AsRef<[u8]>).
341 ///
342 /// # Returns
343 ///
344 /// - `Result<(), ResponseError>` - Result indicating success or failure.
345 pub async fn try_send<D>(&mut self, data: D) -> Result<(), ResponseError>
346 where
347 D: AsRef<[u8]>,
348 {
349 if self.get_closed() {
350 return Err(ResponseError::ConnectionClosed);
351 }
352 Ok(self.get_mut_stream().write_all(data.as_ref()).await?)
353 }
354
355 /// Sends data over the stream.
356 ///
357 /// # Arguments
358 ///
359 /// - `AsRef<[u8]>` - The data to send (must implement AsRef<[u8]>).
360 ///
361 /// # Panics
362 ///
363 /// Panics if the write operation fails.
364 pub async fn send<D>(&mut self, data: D)
365 where
366 D: AsRef<[u8]>,
367 {
368 self.try_send(data).await.unwrap();
369 }
370
371 /// Sends multiple data.
372 ///
373 /// # Arguments
374 ///
375 /// - `IntoIterator<Item = AsRef<[u8]>>` - The data list to send.
376 ///
377 /// # Returns
378 ///
379 /// - `Result<(), ResponseError>` - Result indicating success or failure.
380 pub async fn try_send_list<I, D>(&mut self, data_iter: I) -> Result<(), ResponseError>
381 where
382 I: IntoIterator<Item = D>,
383 D: AsRef<[u8]>,
384 {
385 if self.get_closed() {
386 return Err(ResponseError::ConnectionClosed);
387 }
388 let stream: &mut TcpStream = self.get_mut_stream();
389 for data in data_iter {
390 stream.write_all(data.as_ref()).await?;
391 }
392 Ok(())
393 }
394
395 /// Sends multiple data.
396 ///
397 /// # Arguments
398 ///
399 /// - `IntoIterator<Item = AsRef<[u8]>>` - The data list to send.
400 ///
401 /// # Panics
402 ///
403 /// Panics if any write operation fails.
404 pub async fn send_list<I, D>(&mut self, data_iter: I)
405 where
406 I: IntoIterator<Item = D>,
407 D: AsRef<[u8]>,
408 {
409 self.try_send_list(data_iter).await.unwrap();
410 }
411
412 /// Flushes all buffered data to the stream.
413 ///
414 /// # Returns
415 ///
416 /// - `Result<(), ResponseError>` - Result indicating success or failure.
417 pub async fn try_flush(&mut self) -> Result<(), ResponseError> {
418 if self.get_closed() {
419 return Err(ResponseError::ConnectionClosed);
420 }
421 Ok(self.get_mut_stream().flush().await?)
422 }
423
424 /// Flushes all buffered data to the stream.
425 ///
426 /// # Panics
427 ///
428 /// Panics if the flush operation fails.
429 pub async fn flush(&mut self) {
430 self.try_flush().await.unwrap();
431 }
432}