use crate::*;
impl From<usize> for &'static Stream {
#[inline(always)]
fn from(address: usize) -> &'static Stream {
unsafe { &*(address as *const Stream) }
}
}
impl<'a> From<usize> for &'a mut Stream {
#[inline(always)]
fn from(address: usize) -> &'a mut Stream {
unsafe { &mut *(address as *mut Stream) }
}
}
impl From<&Stream> for usize {
#[inline(always)]
fn from(stream: &Stream) -> Self {
stream as *const Stream as usize
}
}
impl From<&mut Stream> for usize {
#[inline(always)]
fn from(stream: &mut Stream) -> Self {
stream as *mut Stream as usize
}
}
impl AsRef<Stream> for Stream {
#[inline(always)]
fn as_ref(&self) -> &Self {
let address: usize = self.into();
address.into()
}
}
impl AsMut<Stream> for Stream {
#[inline(always)]
fn as_mut(&mut self) -> &mut Self {
let address: usize = self.into();
address.into()
}
}
impl Lifetime for Stream {
#[inline(always)]
unsafe fn leak(&self) -> &'static Self {
let address: usize = self.into();
address.into()
}
#[inline(always)]
unsafe fn leak_mut(&self) -> &'static mut Self {
let address: usize = self.into();
address.into()
}
}
impl Stream {
#[inline(always)]
pub fn is_keep_alive(&self, keep_alive: bool) -> bool {
!self.get_closed() && keep_alive
}
#[inline(always)]
pub unsafe fn free(&mut self) {
let _ = unsafe { Box::from_raw(self) };
}
async fn get_http_from_stream(&mut self) -> Result<Request, RequestError> {
let config: RequestConfig = *self.get_request_config();
let buffer_size: usize = config.get_buffer_size();
let max_path_size: usize = config.get_max_path_size();
let reader: &mut BufReader<&mut TcpStream> =
&mut BufReader::with_capacity(buffer_size, self.get_mut_stream());
let mut line: String = String::with_capacity(buffer_size);
AsyncBufReadExt::read_line(reader, &mut line).await?;
let (method, path, version): (RequestMethod, &str, RequestVersion) =
Request::get_http_first_line(&line)?;
Request::check_http_path_size(path, max_path_size)?;
let hash_index: Option<usize> = path.find(HASH);
let query_index: Option<usize> = path.find(QUERY);
let query: &str = Request::get_http_query(path, query_index, hash_index);
let querys: RequestQuerys = Request::get_http_querys(query);
let path: RequestPath = Request::get_http_path(path, query_index, hash_index);
let (headers, host, content_size): (RequestHeaders, RequestHost, usize) =
Request::get_http_headers(reader, &config).await?;
let body: RequestBody = Request::get_http_body(reader, content_size).await?;
Ok(Request {
method,
host,
version,
path,
querys,
headers,
body,
})
}
pub async fn try_get_http_request(&mut self) -> Result<Request, RequestError> {
let timeout_ms: u64 = self.get_request_config().get_read_timeout_ms();
if timeout_ms == DEFAULT_LOW_SECURITY_READ_TIMEOUT_MS {
return self.get_http_from_stream().await;
}
let duration: Duration = Duration::from_millis(timeout_ms);
timeout(duration, self.get_http_from_stream()).await?
}
pub async fn try_get_websocket_request(&mut self) -> Result<RequestBody, RequestError> {
let config: RequestConfig = *self.get_request_config();
let buffer_size: usize = config.get_buffer_size();
let read_timeout_ms: u64 = config.get_read_timeout_ms();
let mut dynamic_buffer: Vec<u8> = Vec::with_capacity(buffer_size);
let mut temp_buffer: Vec<u8> = vec![0; buffer_size];
let mut full_frame: Vec<u8> = Vec::new();
let mut is_client_response: bool = false;
let duration_opt: Option<Duration> =
if read_timeout_ms == DEFAULT_LOW_SECURITY_READ_TIMEOUT_MS {
None
} else {
let adjusted_timeout_ms: u64 = (read_timeout_ms >> 1) + (read_timeout_ms & 1);
Some(Duration::from_millis(adjusted_timeout_ms))
};
loop {
let len: usize = match self
.get_websocket_from_stream(&mut temp_buffer, duration_opt, &mut is_client_response)
.await
{
Ok(Some(len)) => len,
Ok(None) => continue,
Err(error) => return Err(error),
};
if len == 0 {
return Err(RequestError::IncompleteWebSocketFrame(
HttpStatus::BadRequest,
));
}
dynamic_buffer.extend_from_slice(&temp_buffer[..len]);
while let Some((frame, consumed)) = WebSocketFrame::decode_ws_frame(&dynamic_buffer) {
is_client_response = true;
dynamic_buffer.drain(0..consumed);
match frame.get_opcode() {
WebSocketOpcode::Close => {
return Err(RequestError::ClientClosedConnection(HttpStatus::BadRequest));
}
WebSocketOpcode::Ping | WebSocketOpcode::Pong => continue,
WebSocketOpcode::Text | WebSocketOpcode::Binary => {
match frame.build_full_frame(&mut full_frame) {
Ok(Some(result)) => return Ok(result),
Ok(None) => continue,
Err(error) => return Err(error),
}
}
_ => {
return Err(RequestError::WebSocketOpcodeUnsupported(
HttpStatus::NotImplemented,
));
}
}
}
}
}
pub(crate) async fn get_websocket_from_stream(
&mut self,
buffer: &mut [u8],
duration_opt: Option<Duration>,
is_client_response: &mut bool,
) -> Result<Option<usize>, RequestError> {
let stream: &mut TcpStream = self.get_mut_stream();
if let Some(duration) = duration_opt {
return match timeout(duration, stream.read(buffer)).await {
Ok(result) => match result {
Ok(len) => Ok(Some(len)),
Err(error) => Err(error.into()),
},
Err(error) => {
if !*is_client_response {
return Err(error.into());
}
*is_client_response = false;
self.try_send(&PING_FRAME).await?;
Ok(None)
}
};
}
match stream.read(buffer).await {
Ok(len) => Ok(Some(len)),
Err(error) => Err(error.into()),
}
}
pub async fn try_send<D>(&mut self, data: D) -> Result<(), ResponseError>
where
D: AsRef<[u8]>,
{
Ok(self.get_mut_stream().write_all(data.as_ref()).await?)
}
pub async fn send<D>(&mut self, data: D)
where
D: AsRef<[u8]>,
{
self.try_send(data).await.unwrap();
}
pub async fn try_send_list<I, D>(&mut self, data_iter: I) -> Result<(), ResponseError>
where
I: IntoIterator<Item = D>,
D: AsRef<[u8]>,
{
let stream: &mut TcpStream = self.get_mut_stream();
for data in data_iter {
stream.write_all(data.as_ref()).await?;
}
Ok(())
}
pub async fn send_list<I, D>(&mut self, data_iter: I)
where
I: IntoIterator<Item = D>,
D: AsRef<[u8]>,
{
self.try_send_list(data_iter).await.unwrap();
}
pub async fn try_flush(&mut self) -> Result<(), ResponseError> {
Ok(self.get_mut_stream().flush().await?)
}
pub async fn flush(&mut self) {
self.try_flush().await.unwrap();
}
}