hyper-tungstenite 0.20.0

websockets for hyper servers using tungstenite
Documentation
use http_body_util::Full;
use hyper::{Request, Response};
use hyper::body::{Bytes, Incoming};
use hyper::service::service_fn;
use hyper_util::rt::TokioIo;
use hyper_tungstenite::tungstenite::Error;
use tokio::net::TcpStream;
use std::net::Ipv6Addr;
use tokio_tungstenite::tungstenite::{Message, Result};
use futures::stream::StreamExt;
use futures::sink::SinkExt;

use assert2::{assert, let_assert};

#[tokio::test]
async fn hyper() {
	// Bind a TCP listener to an ephemeral port.
	let_assert!(Ok(listener) = tokio::net::TcpListener::bind((Ipv6Addr::LOCALHOST, 0u16)).await);
	let_assert!(Ok(bind_addr) = listener.local_addr());
	let server = hyper::server::conn::http1::Builder::new();

	// Spawn the server in a task.
	tokio::spawn(async move {
		let service = service_fn(upgrade_websocket);
		let_assert!(Ok((stream, _)) = listener.accept().await);
		let_assert!(Ok(()) = server.serve_connection(TokioIo::new(stream), service).with_upgrades().await);
	});

	// Try to create a websocket connection with the server.
	let_assert!(Ok(stream) = TcpStream::connect(bind_addr).await);
	let_assert!(Ok((mut stream, _response)) = tokio_tungstenite::client_async("ws://localhost/foo", stream).await);

	let_assert!(Some(Ok(message)) = stream.next().await);
	assert!(message == Message::text("Hello!"));

	let_assert!(Ok(()) = stream.send(Message::text("Goodbye!")).await);
	assert!(let Some(Ok(Message::Close(None))) = stream.next().await);
}

async fn upgrade_websocket(mut request: Request<Incoming>) -> Result<Response<Full<Bytes>>> {
	assert!(hyper_tungstenite::is_upgrade_request(&request) == true);

	let (response, stream) = hyper_tungstenite::upgrade(&mut request, None)
		.map_err(Error::Protocol)?;
	tokio::spawn(async move {
		let_assert!(Ok(stream) = stream.await);
		let (mut tx, mut rx) = stream.split();
		assert!(let Ok(()) = tx.send(Message::text("Hello!")).await);
		let_assert!(Some(Ok(reply)) = rx.next().await);
		assert!(reply == Message::text("Goodbye!"));
		assert!(let Ok(()) = tx.send(Message::Close(None)).await);
	});

	Ok(response)
}