Skip to main content

http_type/stream/
impl.rs

1use crate::*;
2
3/// Implementation of `From` trait for converting `usize` address into `&Stream`.
4impl From<usize> for &'static Stream {
5    /// Converts a memory address into a reference to `Stream`.
6    ///
7    /// # Arguments
8    ///
9    /// - `usize` - The memory address of the `Stream` instance.
10    ///
11    /// # Returns
12    ///
13    /// - `&'static Stream` - A reference to the `Stream` at the given address.
14    ///
15    /// # Safety
16    ///
17    /// - The address is guaranteed to be a valid `Stream` instance
18    ///   that was previously converted from a reference and is managed by the runtime.
19    #[inline(always)]
20    fn from(address: usize) -> &'static Stream {
21        unsafe { &*(address as *const Stream) }
22    }
23}
24
25/// Implementation of `From` trait for converting `usize` address into `&mut Stream`.
26impl<'a> From<usize> for &'a mut Stream {
27    /// Converts a memory address into a mutable reference to `Stream`.
28    ///
29    /// # Arguments
30    ///
31    /// - `usize` - The memory address of the `Stream` instance.
32    ///
33    /// # Returns
34    ///
35    /// - `&mut Stream` - A mutable reference to the `Stream` at the given address.
36    ///
37    /// # Safety
38    ///
39    /// - The address is guaranteed to be a valid `Stream` instance
40    ///   that was previously converted from a reference and is managed by the runtime.
41    #[inline(always)]
42    fn from(address: usize) -> &'a mut Stream {
43        unsafe { &mut *(address as *mut Stream) }
44    }
45}
46
47/// Implementation of `From` trait for converting `&Stream` into `usize` address.
48impl From<&Stream> for usize {
49    /// Converts a reference to `Stream` into its memory address.
50    ///
51    /// # Arguments
52    ///
53    /// - `&Stream` - The reference to the `Stream` instance.
54    ///
55    /// # Returns
56    ///
57    /// - `usize` - The memory address of the `Stream` instance.
58    #[inline(always)]
59    fn from(stream: &Stream) -> Self {
60        stream as *const Stream as usize
61    }
62}
63
64/// Implementation of `From` trait for converting `&mut Stream` into `usize` address.
65impl From<&mut Stream> for usize {
66    /// Converts a mutable reference to `Stream` into its memory address.
67    ///
68    /// # Arguments
69    ///
70    /// - `&mut Stream` - The mutable reference to the `Stream` instance.
71    ///
72    /// # Returns
73    ///
74    /// - `usize` - The memory address of the `Stream` instance.
75    #[inline(always)]
76    fn from(stream: &mut Stream) -> Self {
77        stream as *mut Stream as usize
78    }
79}
80
81/// Implementation of `AsRef` trait for `Stream`.
82impl AsRef<Stream> for Stream {
83    /// Converts `&Stream` to `&Stream` via memory address conversion.
84    ///
85    /// # Returns
86    ///
87    /// - `&Stream` - A reference to the `Stream` instance.
88    #[inline(always)]
89    fn as_ref(&self) -> &Self {
90        let address: usize = self.into();
91        address.into()
92    }
93}
94
95/// Implementation of `AsMut` trait for `Stream`.
96impl AsMut<Stream> for Stream {
97    /// Converts `&mut Stream` to `&mut Stream` via memory address conversion.
98    ///
99    /// # Returns
100    ///
101    /// - `&mut Stream` - A mutable reference to the `Stream` instance.
102    #[inline(always)]
103    fn as_mut(&mut self) -> &mut Self {
104        let address: usize = self.into();
105        address.into()
106    }
107}
108
109/// Implementation of `Lifetime` trait for `Stream`.
110impl Lifetime for Stream {
111    /// Converts a reference to the stream into a `'static` reference.
112    ///
113    /// # Returns
114    ///
115    /// - `&'static Self`: A reference to the stream with a `'static` lifetime.
116    ///
117    /// # Safety
118    ///
119    /// - The address is guaranteed to be a valid `Self` instance
120    ///   that was previously converted from a reference and is managed by the runtime.
121    #[inline(always)]
122    unsafe fn leak(&self) -> &'static Self {
123        let address: usize = self.into();
124        address.into()
125    }
126
127    /// Converts a reference to the stream into a `'static` mutable reference.
128    ///
129    /// # Returns
130    ///
131    /// - `&'static mut Self`: A mutable reference to the stream with a `'static` lifetime.
132    ///
133    /// # Safety
134    ///
135    /// - The address is guaranteed to be a valid `Self` instance
136    ///   that was previously converted from a reference and is managed by the runtime.
137    #[inline(always)]
138    unsafe fn leak_mut(&self) -> &'static mut Self {
139        let address: usize = self.into();
140        address.into()
141    }
142}
143
144impl Stream {
145    /// Checks if the connection should be kept alive.
146    ///
147    /// This method evaluates whether the connection should remain open based on
148    /// the closed state and the keep_alive parameter.
149    ///
150    /// # Arguments
151    ///
152    /// - `bool` - Whether keep-alive is enabled for the request.
153    ///
154    /// # Returns
155    ///
156    /// - `bool` - True if the connection should be kept alive, otherwise false.
157    #[inline(always)]
158    pub fn is_keep_alive(&self, keep_alive: bool) -> bool {
159        !self.get_closed() && keep_alive
160    }
161
162    /// Free the context.
163    ///
164    /// # Safety
165    ///
166    /// - The address is guaranteed to be a valid `Self` instance
167    ///   that was previously converted from a reference and is managed by the runtime.
168    #[inline(always)]
169    pub unsafe fn free(&mut self) {
170        let _ = unsafe { Box::from_raw(self) };
171    }
172
173    /// Parses the HTTP request content from the stream.
174    ///
175    /// This is an internal helper function that performs the actual parsing.
176    ///
177    /// # Returns
178    ///
179    /// - `Result<Request, RequestError>`: The parsed request or an error.
180    async fn get_http_from_stream(&mut self) -> Result<Request, RequestError> {
181        let config: RequestConfig = *self.get_request_config();
182        let buffer_size: usize = config.get_buffer_size();
183        let max_path_size: usize = config.get_max_path_size();
184        let reader: &mut BufReader<&mut TcpStream> =
185            &mut BufReader::with_capacity(buffer_size, self.get_mut_stream());
186        let mut line: String = String::with_capacity(buffer_size);
187        AsyncBufReadExt::read_line(reader, &mut line).await?;
188        let (method, path, version): (RequestMethod, &str, RequestVersion) =
189            Request::get_http_first_line(&line)?;
190        Request::check_http_path_size(path, max_path_size)?;
191        let hash_index: Option<usize> = path.find(HASH);
192        let query_index: Option<usize> = path.find(QUERY);
193        let query: &str = Request::get_http_query(path, query_index, hash_index);
194        let querys: RequestQuerys = Request::get_http_querys(query);
195        let path: RequestPath = Request::get_http_path(path, query_index, hash_index);
196        let (headers, host, content_size): (RequestHeaders, RequestHost, usize) =
197            Request::get_http_headers(reader, &config).await?;
198        let body: RequestBody = Request::get_http_body(reader, content_size).await?;
199        Ok(Request {
200            method,
201            host,
202            version,
203            path,
204            querys,
205            headers,
206            body,
207        })
208    }
209
210    /// Parses an HTTP request from a TCP stream.
211    ///
212    /// Wraps the stream in a buffered reader and delegates to `http_from_reader`.
213    /// If the timeout is DEFAULT_LOW_SECURITY_READ_TIMEOUT_MS, no timeout is applied.
214    ///
215    /// # Returns
216    ///
217    /// - `Result<Request, RequestError>` - The parsed request or an error.
218    pub async fn try_get_http_request(&mut self) -> Result<Request, RequestError> {
219        if self.get_closed() {
220            return Err(RequestError::ServerClosedConnection(HttpStatus::BadRequest));
221        }
222        let timeout_ms: u64 = self.get_request_config().get_read_timeout_ms();
223        if timeout_ms == DEFAULT_LOW_SECURITY_READ_TIMEOUT_MS {
224            return self.get_http_from_stream().await;
225        }
226        let duration: Duration = Duration::from_millis(timeout_ms);
227        timeout(duration, self.get_http_from_stream()).await?
228    }
229
230    /// Parses a WebSocket request from a TCP stream.
231    ///
232    /// Wraps the stream in a buffered reader and delegates to `ws_from_reader`.
233    /// If the timeout is DEFAULT_LOW_SECURITY_READ_TIMEOUT_MS, no timeout is applied.
234    ///
235    /// # Returns
236    ///
237    /// - `Result<Request, RequestError>`: The parsed WebSocket request or an error.
238    pub async fn try_get_websocket_request(&mut self) -> Result<RequestBody, RequestError> {
239        if self.get_closed() {
240            return Err(RequestError::ServerClosedConnection(HttpStatus::BadRequest));
241        }
242        let config: RequestConfig = *self.get_request_config();
243        let buffer_size: usize = config.get_buffer_size();
244        let read_timeout_ms: u64 = config.get_read_timeout_ms();
245        let mut dynamic_buffer: Vec<u8> = Vec::with_capacity(buffer_size);
246        let mut temp_buffer: Vec<u8> = vec![0; buffer_size];
247        let mut full_frame: Vec<u8> = Vec::new();
248        let mut is_client_response: bool = false;
249        let duration_opt: Option<Duration> =
250            if read_timeout_ms == DEFAULT_LOW_SECURITY_READ_TIMEOUT_MS {
251                None
252            } else {
253                let adjusted_timeout_ms: u64 = (read_timeout_ms >> 1) + (read_timeout_ms & 1);
254                Some(Duration::from_millis(adjusted_timeout_ms))
255            };
256        loop {
257            let len: usize = match self
258                .get_websocket_from_stream(&mut temp_buffer, duration_opt, &mut is_client_response)
259                .await
260            {
261                Ok(Some(len)) => len,
262                Ok(None) => continue,
263                Err(error) => return Err(error),
264            };
265            if len == 0 {
266                return Err(RequestError::IncompleteWebSocketFrame(
267                    HttpStatus::BadRequest,
268                ));
269            }
270            dynamic_buffer.extend_from_slice(&temp_buffer[..len]);
271            while let Some((frame, consumed)) = WebSocketFrame::decode_ws_frame(&dynamic_buffer) {
272                is_client_response = true;
273                dynamic_buffer.drain(0..consumed);
274                match frame.get_opcode() {
275                    WebSocketOpcode::Close => {
276                        return Err(RequestError::ClientClosedConnection(HttpStatus::BadRequest));
277                    }
278                    WebSocketOpcode::Ping | WebSocketOpcode::Pong => continue,
279                    WebSocketOpcode::Text | WebSocketOpcode::Binary => {
280                        match frame.build_full_frame(&mut full_frame) {
281                            Ok(Some(result)) => return Ok(result),
282                            Ok(None) => continue,
283                            Err(error) => return Err(error),
284                        }
285                    }
286                    _ => {
287                        return Err(RequestError::WebSocketOpcodeUnsupported(
288                            HttpStatus::NotImplemented,
289                        ));
290                    }
291                }
292            }
293        }
294    }
295
296    /// Reads data from the stream with optional timeout handling.
297    ///
298    /// # Arguments
299    ///
300    /// - `&mut [u8]`: The buffer to read data into.
301    /// - `Option<Duration>`: The optional timeout duration. If Some, timeout is applied; if None, no timeout.
302    /// - `&mut bool`: Mutable reference to track if we got a client response.
303    ///
304    /// # Returns
305    ///
306    /// - `Result<Option<usize>, RequestError>`: The number of bytes read, None for timeout/ping, or an error.
307    pub(crate) async fn get_websocket_from_stream(
308        &mut self,
309        buffer: &mut [u8],
310        duration_opt: Option<Duration>,
311        is_client_response: &mut bool,
312    ) -> Result<Option<usize>, RequestError> {
313        let stream: &mut TcpStream = self.get_mut_stream();
314        if let Some(duration) = duration_opt {
315            return match timeout(duration, stream.read(buffer)).await {
316                Ok(result) => match result {
317                    Ok(len) => Ok(Some(len)),
318                    Err(error) => Err(error.into()),
319                },
320                Err(error) => {
321                    if !*is_client_response {
322                        return Err(error.into());
323                    }
324                    *is_client_response = false;
325                    self.try_send(&PING_FRAME).await?;
326                    Ok(None)
327                }
328            };
329        }
330        match stream.read(buffer).await {
331            Ok(len) => Ok(Some(len)),
332            Err(error) => Err(error.into()),
333        }
334    }
335
336    /// Sends data over the stream.
337    ///
338    /// # Arguments
339    ///
340    /// - `AsRef<[u8]>` - The data to send (must implement AsRef<[u8]>).
341    ///
342    /// # Returns
343    ///
344    /// - `Result<(), ResponseError>` - Result indicating success or failure.
345    pub async fn try_send<D>(&mut self, data: D) -> Result<(), ResponseError>
346    where
347        D: AsRef<[u8]>,
348    {
349        if self.get_closed() {
350            return Err(ResponseError::ConnectionClosed);
351        }
352        Ok(self.get_mut_stream().write_all(data.as_ref()).await?)
353    }
354
355    /// Sends data over the stream.
356    ///
357    /// # Arguments
358    ///
359    /// - `AsRef<[u8]>` - The data to send (must implement AsRef<[u8]>).
360    ///
361    /// # Panics
362    ///
363    /// Panics if the write operation fails.
364    pub async fn send<D>(&mut self, data: D)
365    where
366        D: AsRef<[u8]>,
367    {
368        self.try_send(data).await.unwrap();
369    }
370
371    /// Sends multiple data.
372    ///
373    /// # Arguments
374    ///
375    /// - `IntoIterator<Item = AsRef<[u8]>>` - The data list to send.
376    ///
377    /// # Returns
378    ///
379    /// - `Result<(), ResponseError>` - Result indicating success or failure.
380    pub async fn try_send_list<I, D>(&mut self, data_iter: I) -> Result<(), ResponseError>
381    where
382        I: IntoIterator<Item = D>,
383        D: AsRef<[u8]>,
384    {
385        if self.get_closed() {
386            return Err(ResponseError::ConnectionClosed);
387        }
388        let stream: &mut TcpStream = self.get_mut_stream();
389        for data in data_iter {
390            stream.write_all(data.as_ref()).await?;
391        }
392        Ok(())
393    }
394
395    /// Sends multiple data.
396    ///
397    /// # Arguments
398    ///
399    /// - `IntoIterator<Item = AsRef<[u8]>>` - The data list to send.
400    ///
401    /// # Panics
402    ///
403    /// Panics if any write operation fails.
404    pub async fn send_list<I, D>(&mut self, data_iter: I)
405    where
406        I: IntoIterator<Item = D>,
407        D: AsRef<[u8]>,
408    {
409        self.try_send_list(data_iter).await.unwrap();
410    }
411
412    /// Flushes all buffered data to the stream.
413    ///
414    /// # Returns
415    ///
416    /// - `Result<(), ResponseError>` - Result indicating success or failure.
417    pub async fn try_flush(&mut self) -> Result<(), ResponseError> {
418        if self.get_closed() {
419            return Err(ResponseError::ConnectionClosed);
420        }
421        Ok(self.get_mut_stream().flush().await?)
422    }
423
424    /// Flushes all buffered data to the stream.
425    ///
426    /// # Panics
427    ///
428    /// Panics if the flush operation fails.
429    pub async fn flush(&mut self) {
430        self.try_flush().await.unwrap();
431    }
432}