use futures::io::{BufReader, BufWriter};
use soket::{BoxedError, connection, handshake};
use tokio::{net::{TcpListener, TcpStream}, stream::StreamExt};
use tokio_util::compat::{Compat, Tokio02AsyncReadCompatExt};
#[tokio::main]
async fn main() -> Result<(), BoxedError> {
let mut listener = TcpListener::bind("127.0.0.1:9001").await?;
let mut incoming = listener.incoming();
while let Some(socket) = incoming.next().await {
let mut server = new_server(socket?);
let key = {
let req = server.receive_request().await?;
req.into_key()
};
let accept = handshake::server::Response::Accept { key: &key, protocol: None };
server.send_response(&accept).await?;
let (mut sender, mut receiver) = server.into_builder().finish();
let mut message = Vec::new();
loop {
message.clear();
match receiver.receive_data(&mut message).await {
Ok(soket::Data::Binary(n)) => {
assert_eq!(n, message.len());
sender.send_binary_mut(&mut message).await?;
sender.flush().await?
}
Ok(soket::Data::Text(n)) => {
assert_eq!(n, message.len());
if let Ok(txt) = std::str::from_utf8(&message) {
sender.send_text(txt).await?;
sender.flush().await?
} else {
break
}
}
Err(connection::Error::Closed) => break,
Err(e) => {
log::error!("connection error: {}", e);
break
}
}
}
}
Ok(())
}
#[cfg(not(feature = "deflate"))]
fn new_server<'a>(socket: TcpStream) -> handshake::Server<'a, BufReader<BufWriter<Compat<TcpStream>>>> {
handshake::Server::new(BufReader::new(BufWriter::new(socket.compat())))
}
#[cfg(feature = "deflate")]
fn new_server<'a>(socket: TcpStream) -> handshake::Server<'a, BufReader<BufWriter<Compat<TcpStream>>>> {
let socket = BufReader::with_capacity(8 * 1024, BufWriter::with_capacity(16 * 1024, socket.compat()));
let mut server = handshake::Server::new(socket);
let deflate = soket::extension::deflate::Deflate::new(soket::Mode::Server);
server.add_extension(Box::new(deflate));
server
}