finchers/endpoints/
upgrade.rs1use std::io;
4
5use bytes::{Buf, BufMut};
6use futures::{IntoFuture, Poll};
7use http::header::{HeaderName, HeaderValue};
8use http::response;
9use http::{HttpTryFrom, Response, StatusCode};
10use hyper::upgrade::Upgraded;
11use tokio::io::{AsyncRead, AsyncWrite};
12
13use endpoint::{lazy, Lazy};
14use error;
15use error::Error;
16use output::{Output, OutputContext};
17
18#[derive(Debug)]
22pub struct UpgradedIo(Upgraded);
23
24impl io::Read for UpgradedIo {
25 #[inline]
26 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
27 self.0.read(buf)
28 }
29}
30
31impl io::Write for UpgradedIo {
32 #[inline]
33 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
34 self.0.write(buf)
35 }
36
37 #[inline]
38 fn flush(&mut self) -> io::Result<()> {
39 self.0.flush()
40 }
41}
42
43impl AsyncRead for UpgradedIo {
44 #[inline]
45 unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
46 self.0.prepare_uninitialized_buffer(buf)
47 }
48
49 #[inline]
50 fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
51 self.0.read_buf(buf)
52 }
53}
54
55impl AsyncWrite for UpgradedIo {
56 #[inline]
57 fn shutdown(&mut self) -> Poll<(), io::Error> {
58 AsyncWrite::shutdown(&mut self.0)
59 }
60
61 #[inline]
62 fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
63 self.0.write_buf(buf)
64 }
65}
66
67#[derive(Debug)]
72pub struct Builder {
73 builder: response::Builder,
74}
75
76impl Default for Builder {
77 fn default() -> Builder {
78 let mut builder = response::Builder::new();
79 builder.status(StatusCode::SWITCHING_PROTOCOLS);
80
81 Builder { builder }
82 }
83}
84
85impl Builder {
86 pub fn new() -> Builder {
88 Default::default()
89 }
90
91 pub fn header<K, V>(mut self, name: K, value: V) -> Self
93 where
94 HeaderName: HttpTryFrom<K>,
95 HeaderValue: HttpTryFrom<V>,
96 {
97 self.builder.header(name, value);
98 self
99 }
100
101 pub fn finish<F, R>(self, on_upgrade: F) -> impl Output
103 where
104 F: FnOnce(UpgradedIo) -> R + Send + 'static,
105 R: IntoFuture<Item = (), Error = ()>,
106 R::Future: Send + 'static,
107 {
108 UpgradeOutput {
109 builder: self,
110 on_upgrade,
111 }
112 }
113}
114
115#[derive(Debug)]
116struct UpgradeOutput<F> {
117 builder: Builder,
118 on_upgrade: F,
119}
120
121impl<F, R> Output for UpgradeOutput<F>
122where
123 F: FnOnce(UpgradedIo) -> R + Send + 'static,
124 R: IntoFuture<Item = (), Error = ()>,
125 R::Future: Send + 'static,
126{
127 type Body = ();
128 type Error = Error;
129
130 fn respond(self, cx: &mut OutputContext<'_>) -> Result<Response<Self::Body>, Self::Error> {
131 let Self {
132 builder: Builder { mut builder },
133 on_upgrade,
134 } = self;
135 cx.input()
136 .body_mut()
137 .upgrade(|upgraded| on_upgrade(UpgradedIo(upgraded)));
138 builder.body(()).map_err(::error::fail)
139 }
140}
141
142pub fn builder() -> Lazy<impl Fn() -> error::Result<Builder>> {
144 lazy(|| Ok(Builder::new()))
145}