finchers/endpoints/
upgrade.rs

1//! Endpoints for supporting HTTP/1.1 protocol upgrade.
2
3use std::io;
4
5use bytes::{Buf, BufMut};
6use futures::{IntoFuture, Poll};
7use http::header::{HeaderName, HeaderValue};
8use http::response;
9use http::{HttpTryFrom, Response, StatusCode};
10use hyper::upgrade::Upgraded;
11use tokio::io::{AsyncRead, AsyncWrite};
12
13use endpoint::{lazy, Lazy};
14use error;
15use error::Error;
16use output::{Output, OutputContext};
17
18/// An asynchronous I/O representing an upgraded HTTP connection.
19///
20/// This type is currently implemented as a thin wrrapper of `hyper::upgrade::Upgraded`.
21#[derive(Debug)]
22pub struct UpgradedIo(Upgraded);
23
24impl io::Read for UpgradedIo {
25    #[inline]
26    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
27        self.0.read(buf)
28    }
29}
30
31impl io::Write for UpgradedIo {
32    #[inline]
33    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
34        self.0.write(buf)
35    }
36
37    #[inline]
38    fn flush(&mut self) -> io::Result<()> {
39        self.0.flush()
40    }
41}
42
43impl AsyncRead for UpgradedIo {
44    #[inline]
45    unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
46        self.0.prepare_uninitialized_buffer(buf)
47    }
48
49    #[inline]
50    fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
51        self.0.read_buf(buf)
52    }
53}
54
55impl AsyncWrite for UpgradedIo {
56    #[inline]
57    fn shutdown(&mut self) -> Poll<(), io::Error> {
58        AsyncWrite::shutdown(&mut self.0)
59    }
60
61    #[inline]
62    fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
63        self.0.write_buf(buf)
64    }
65}
66
67/// A builder for constructing an upgraded HTTP response.
68///
69/// The output to be created will spawn a task when it is converted into
70/// an HTTP response. The task represents the handler of upgraded protocol.
71#[derive(Debug)]
72pub struct Builder {
73    builder: response::Builder,
74}
75
76impl Default for Builder {
77    fn default() -> Builder {
78        let mut builder = response::Builder::new();
79        builder.status(StatusCode::SWITCHING_PROTOCOLS);
80
81        Builder { builder }
82    }
83}
84
85impl Builder {
86    /// Creates a new `Builder` with the specified task executor.
87    pub fn new() -> Builder {
88        Default::default()
89    }
90
91    /// Appends a header filed which will be inserted into the response.
92    pub fn header<K, V>(mut self, name: K, value: V) -> Self
93    where
94        HeaderName: HttpTryFrom<K>,
95        HeaderValue: HttpTryFrom<V>,
96    {
97        self.builder.header(name, value);
98        self
99    }
100
101    /// Consumes itself and creates a new `Output` from the specified function.
102    pub fn finish<F, R>(self, on_upgrade: F) -> impl Output
103    where
104        F: FnOnce(UpgradedIo) -> R + Send + 'static,
105        R: IntoFuture<Item = (), Error = ()>,
106        R::Future: Send + 'static,
107    {
108        UpgradeOutput {
109            builder: self,
110            on_upgrade,
111        }
112    }
113}
114
115#[derive(Debug)]
116struct UpgradeOutput<F> {
117    builder: Builder,
118    on_upgrade: F,
119}
120
121impl<F, R> Output for UpgradeOutput<F>
122where
123    F: FnOnce(UpgradedIo) -> R + Send + 'static,
124    R: IntoFuture<Item = (), Error = ()>,
125    R::Future: Send + 'static,
126{
127    type Body = ();
128    type Error = Error;
129
130    fn respond(self, cx: &mut OutputContext<'_>) -> Result<Response<Self::Body>, Self::Error> {
131        let Self {
132            builder: Builder { mut builder },
133            on_upgrade,
134        } = self;
135        cx.input()
136            .body_mut()
137            .upgrade(|upgraded| on_upgrade(UpgradedIo(upgraded)));
138        builder.body(()).map_err(::error::fail)
139    }
140}
141
142/// Create an endpoint which just returns a value of `Builder`.
143pub fn builder() -> Lazy<impl Fn() -> error::Result<Builder>> {
144    lazy(|| Ok(Builder::new()))
145}