1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
use async_http_codec::{BodyDecode, ResponseHeadEncode, ResponseHeadEncoder};
use async_ws::connection::{WsConfig, WsConnection};
use async_ws::http::upgrade_response;
use futures::prelude::*;
use http::{response, Request, Response};
use pin_project::pin_project;
use std::pin::Pin;
use std::task::{Context, Poll};
pub enum HttpOrWs<IO: AsyncRead + AsyncWrite + Unpin> {
Http(Request<BodyDecode<IO>>),
Ws(Request<WsAccept<IO>>),
}
#[pin_project]
pub struct HttpOrWsIncoming<
IO: AsyncRead + AsyncWrite + Unpin,
T: Stream<Item = Request<BodyDecode<IO>>> + Unpin,
> {
incoming: T,
}
impl<IO: AsyncRead + AsyncWrite + Unpin, T: Stream<Item = Request<BodyDecode<IO>>> + Unpin>
HttpOrWsIncoming<IO, T>
{
pub fn new(http_incoming: T) -> Self {
Self {
incoming: http_incoming,
}
}
}
impl<IO: AsyncRead + AsyncWrite + Unpin, T: Stream<Item = Request<BodyDecode<IO>>> + Unpin> Stream
for HttpOrWsIncoming<IO, T>
{
type Item = HttpOrWs<IO>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
loop {
return match this.incoming.poll_next_unpin(cx) {
Poll::Ready(Some(request)) => match upgrade_response(&request) {
Some(response) => {
let (request_head, body_read) = request.into_parts();
let response =
Response::from_parts(response.into_parts().0, body_read.checkpoint().0);
Poll::Ready(Some(HttpOrWs::Ws(Request::from_parts(
request_head,
WsAccept::new(response),
))))
}
None => Poll::Ready(Some(HttpOrWs::Http(request))),
},
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
};
}
}
}
pub struct WsAccept<IO: AsyncRead + AsyncWrite + Unpin> {
response: ResponseHeadEncode<IO, response::Parts>,
}
impl<IO: AsyncRead + AsyncWrite + Unpin> WsAccept<IO> {
fn new(response: Response<IO>) -> Self {
let (head, transport) = response.into_parts();
Self {
response: ResponseHeadEncoder::default().encode(transport, head),
}
}
}
impl<IO: AsyncRead + AsyncWrite + Unpin> Future for WsAccept<IO> {
type Output = anyhow::Result<WsConnection<IO>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.response.poll_unpin(cx) {
Poll::Ready(Ok((transport, _))) => {
Poll::Ready(Ok(WsConnection::with_config(transport, WsConfig::server())))
}
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
Poll::Pending => Poll::Pending,
}
}
}