use std::mem;
use std::sync::Arc;
use std::cell::RefCell;
use std::net::Shutdown;
use std::str::from_utf8;
use std::marker::PhantomData;
use std::collections::HashMap;
use std::result::Result as GenResult;
use std::io::{ErrorKind, Result, Error};
use fnv::FnvBuildHasher;
use http::Response;
use httparse::{EMPTY_HEADER, Request};
use futures::future::{FutureExt, LocalBoxFuture};
use tcp::{Socket, AsyncService, SocketStatus, SocketHandle,
utils::SocketContext};
use crate::{acceptor::{MAX_HANDSHAKE_HTTP_HEADER_LIMIT, WsAcceptor},
connect::WsSocket,
frame::{WsHead, WsFrame},
utils::{ChildProtocol, WsStatus, WsSession}};
pub struct WebsocketListener<S: Socket> {
acceptor: WsAcceptor<S>, protocol: Arc<dyn ChildProtocol<S>>, }
impl<S: Socket> AsyncService<S> for WebsocketListener<S> {
fn handle_connected(&self,
handle: SocketHandle<S>,
status: SocketStatus) -> LocalBoxFuture<'static, ()> {
async move {
if let SocketStatus::Connected(Err(e)) = status {
handle.close(Err(Error::new(ErrorKind::Other,
format!("Websocket connect failed, token: {:?}, remote: {:?}, local: {:?}, reason: {:?}",
handle.get_token(),
handle.get_remote(),
handle.get_local(),
e))));
return;
}
}.boxed_local()
}
fn handle_readed(&self,
handle: SocketHandle<S>,
status: SocketStatus) -> LocalBoxFuture<'static, ()> {
if let SocketStatus::Readed(Err(e)) = status {
return async move {
handle.close(Err(Error::new(ErrorKind::Other,
format!("Websocket read failed, token: {:?}, remote: {:?}, local: {:?}, reason: {:?}",
handle.get_token(),
handle.get_remote(),
handle.get_local(),
e))));
}.boxed_local();
}
if unsafe { (&*handle.get_context().get()).is_empty() } {
let support_protocol = self.protocol.clone();
let acceptor = self.acceptor.clone();
let window_bits = self.acceptor.window_bits();
async move {
WsAcceptor::<S>::accept(handle.clone(),
window_bits,
acceptor,
support_protocol).await;
}.boxed_local()
} else {
let window_bits = self.acceptor.window_bits();
let protocol = self.protocol.clone();
async move {
WsSocket::<S>::handle_readed(&handle,
window_bits,
protocol).await;
}.boxed_local()
}
}
fn handle_writed(&self,
handle: SocketHandle<S>,
status: SocketStatus) -> LocalBoxFuture<'static, ()> {
async move {
if let SocketStatus::Writed(Err(e)) = status {
handle.close(Err(Error::new(ErrorKind::Other,
format!("Websocket write failed, token: {:?}, remote: {:?}, local: {:?}, reason: {:?}",
handle.get_token(),
handle.get_remote(),
handle.get_local(),
e))));
return;
}
WsSocket::handle_writed(handle).await;
}.boxed_local()
}
fn handle_closed(&self,
handle: SocketHandle<S>,
status: SocketStatus) -> LocalBoxFuture<'static, ()> {
let window_bits = self.acceptor.window_bits();
let protocol = self.protocol.clone();
async move {
if let SocketStatus::Closed(result) = status {
WsSocket::handle_closed(handle,
window_bits,
protocol,
result).await;
}
}.boxed_local()
}
fn handle_timeouted(&self,
handle: SocketHandle<S>,
status: SocketStatus) -> LocalBoxFuture<'static, ()> {
let window_bits = self.acceptor.window_bits();
let protocol = self.protocol.clone();
async move {
if let SocketStatus::Timeout(event) = status {
WsSocket::handle_timeouted(handle,
window_bits,
protocol,
event).await;
}
}.boxed_local()
}
}
impl<S: Socket> WebsocketListener<S> {
pub fn with_protocol(protocol: Arc<dyn ChildProtocol<S>>) -> Self {
WebsocketListener {
acceptor: WsAcceptor::default(),
protocol: protocol,
}
}
}