async_h1b/server/
mod.rs

1//! Process HTTP connections on the server.
2
3use async_io::Timer;
4use futures_lite::io::{self, AsyncRead as Read, AsyncWrite as Write};
5use futures_lite::prelude::*;
6/*=======
7use async_std::future::{timeout, Future, TimeoutError};
8use async_std::io::{self, Read, Write};
9>>>>>>> origin/v3*/
10
11use http_types::upgrade::Connection;
12use http_types::{
13    headers::{CONNECTION, UPGRADE},
14    Version,
15};
16use http_types::{Request, Response, StatusCode};
17use std::{future::Future, marker::PhantomData, time::Duration};
18mod body_reader;
19mod decode;
20mod encode;
21
22pub use decode::decode;
23pub use encode::Encoder;
24
25/// Configure the server.
26#[derive(Debug, Clone)]
27pub struct ServerOptions {
28    /// Timeout to handle headers. Defaults to 60s.
29    headers_timeout: Option<Duration>,
30    default_host: Option<String>,
31}
32
33impl ServerOptions {
34    /// constructs a new ServerOptions with default settings
35    pub fn new() -> Self {
36        Self::default()
37    }
38
39    /// sets the timeout by which the headers must have been received
40    pub fn with_headers_timeout(mut self, headers_timeout: Duration) -> Self {
41        self.headers_timeout = Some(headers_timeout);
42        self
43    }
44
45    /// Sets the default http 1.0 host for this server. If no host
46    /// header is provided on an http/1.0 request, this host will be
47    /// used to construct the Request Url.
48    ///
49    /// If this is not provided, the server will respond to all
50    /// http/1.0 requests with status `505 http version not
51    /// supported`, whether or not a host header is provided.
52    ///
53    /// The default value for this is None, and as a result async-h1
54    /// is by default an http-1.1-only server.
55    pub fn with_default_host(mut self, default_host: &str) -> Self {
56        self.default_host = Some(default_host.into());
57        self
58    }
59}
60
61impl Default for ServerOptions {
62    fn default() -> Self {
63        Self {
64            headers_timeout: Some(Duration::from_secs(60)),
65            default_host: None,
66        }
67    }
68}
69
70/// Accept a new incoming HTTP/1.1 connection.
71///
72/// Supports `KeepAlive` requests by default.
73pub async fn accept<RW, F, Fut>(io: RW, endpoint: F) -> crate::Result<()>
74where
75    RW: Read + Write + Clone + Send + Sync + Unpin + 'static,
76    F: Fn(Request) -> Fut,
77    Fut: Future<Output = Response>,
78{
79    Server::new(io, endpoint).accept().await
80}
81
82/// Accept a new incoming HTTP/1.1 connection.
83///
84/// Supports `KeepAlive` requests by default.
85pub async fn accept_with_opts<RW, F, Fut>(
86    io: RW,
87    endpoint: F,
88    opts: ServerOptions,
89) -> crate::Result<()>
90where
91    RW: Read + Write + Clone + Send + Sync + Unpin + 'static,
92    F: Fn(Request) -> Fut,
93    Fut: Future<Output = Response>,
94{
95    Server::new(io, endpoint).with_opts(opts).accept().await
96}
97
98/// struct for server
99#[derive(Debug)]
100pub struct Server<RW, F, Fut> {
101    io: RW,
102    endpoint: F,
103    opts: ServerOptions,
104    _phantom: PhantomData<Fut>,
105}
106
107/// An enum that represents whether the server should accept a subsequent request
108#[derive(Debug, Copy, Clone, Eq, PartialEq)]
109pub enum ConnectionStatus {
110    /// The server should not accept another request
111    Close,
112
113    /// The server may accept another request
114    KeepAlive,
115}
116
117impl<RW, F, Fut> Server<RW, F, Fut>
118where
119    RW: Read + Write + Clone + Send + Sync + Unpin + 'static,
120    F: Fn(Request) -> Fut,
121    Fut: Future<Output = Response>,
122{
123    /// builds a new server
124    pub fn new(io: RW, endpoint: F) -> Self {
125        Self {
126            io,
127            endpoint,
128            opts: Default::default(),
129            _phantom: PhantomData,
130        }
131    }
132
133    /// with opts
134    pub fn with_opts(mut self, opts: ServerOptions) -> Self {
135        self.opts = opts;
136        self
137    }
138
139    /// accept in a loop
140    pub async fn accept(&mut self) -> crate::Result<()> {
141        loop {
142            let result = self.accept_one().await;
143            match result {
144                Ok(status) => {
145                    if status != ConnectionStatus::KeepAlive {
146                        break;
147                    }
148                },
149                Err(err) => {
150                    log::warn!("async-h1 accept_one returns Err: {err:#?}");
151                    return Err(err);
152                }
153            }
154        }
155        Ok(())
156    }
157
158    /// accept one request
159    pub async fn accept_one(&mut self) -> crate::Result<ConnectionStatus>
160    where
161        RW: Read + Write + Clone + Send + Sync + Unpin + 'static,
162        F: Fn(Request) -> Fut,
163        Fut: Future<Output = Response>,
164    {
165        // Decode a new request, timing out if this takes longer than the timeout duration.
166        let fut = decode(self.io.clone(), &self.opts);
167
168        let (req, mut body) = if let Some(timeout_duration) = self.opts.headers_timeout {
169            match fut
170                .or(async {
171                    Timer::after(timeout_duration).await;
172                    Ok(None)
173                })
174                .await
175            {
176                Ok(Some(r)) => r,
177                Ok(None) => return Ok(ConnectionStatus::Close), /* EOF or timeout */
178                Err(e) => return Err(e),
179            }
180        } else {
181            match fut.await? {
182                Some(r) => r,
183                None => return Ok(ConnectionStatus::Close), /* EOF */
184            }
185        };
186
187        let req_version = req.version();
188
189
190        let connection_header =
191            req.header(CONNECTION)
192            .map(|connection| connection.as_str())
193            .unwrap_or("")
194            .to_string();
195
196        let res_header_keepalive = {
197            let c = connection_header.to_ascii_lowercase();
198            if c == "keep-alive" || c.contains("keep-alive,") {
199                "keep-alive"
200            } else if c == "close" || c.contains("close") {
201                "close"
202            } else {
203                match req_version {
204                    Some(Version::Http1_1) => "keep-alive",
205                    Some(Version::Http1_0) => "close",
206                    _ => { unreachable!(); }
207                }
208            }
209        };
210
211        let close_connection =
212            match res_header_keepalive {
213                "close" => true,
214                _ => false
215            };
216        /*
217        let mut close_connection =
218            if req_version == Some(Version::Http1_0) {
219                ! connection_header.eq_ignore_ascii_case("keep-alive")
220            } else {
221                connection_header.eq_ignore_ascii_case("close")
222            };
223        */
224
225
226        let connection_header_is_upgrade = connection_header.split(',').any(|s| s.trim().eq_ignore_ascii_case("upgrade"));
227        let has_upgrade_header = req.header(UPGRADE).is_some();
228        let upgrade_requested = has_upgrade_header && connection_header_is_upgrade;
229
230        let method = req.method();
231
232        // Pass the request to the endpoint and encode the response.
233        let mut response = (self.endpoint)(req).await;
234        response.set_version(req_version);
235
236        /*
237        close_connection |=
238            response.header(CONNECTION)
239            .map(|c| c.as_str().eq_ignore_ascii_case("close"))
240            .unwrap_or(false);
241        */
242
243        let upgrade_provided =
244            response.status() == StatusCode::SwitchingProtocols && response.has_upgrade();
245
246        if ! upgrade_provided {
247            if let Some(hc) = response.header(CONNECTION) {
248                let tmp: Vec<_> = hc.iter().collect();
249                if tmp.len() != 1 {
250                    // is multi "Connection" headers can be properly handled by clients?
251                    // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Connection#syntax
252                    return Err(crate::Error::UnexpectedHeader("should not have multi 'Connection' header"));
253                }
254
255                let mut new_hc = hc.last().as_str().to_string();
256                if new_hc.is_empty() {
257                    new_hc = res_header_keepalive.to_string();
258                } else {
259                    new_hc.push(',');
260                    new_hc.push(' ');
261                    new_hc.extend(res_header_keepalive.chars());
262                }
263                response.insert_header(CONNECTION, new_hc);
264            } else {
265                response.insert_header(CONNECTION, res_header_keepalive);
266            }
267        }
268
269        let upgrade_sender = if upgrade_requested && upgrade_provided {
270            Some(response.send_upgrade())
271        } else {
272            None
273        };
274
275        let mut encoder = Encoder::new(response, method);
276
277        let bytes_written = io::copy(&mut encoder, &mut self.io).await?;
278        log::trace!("wrote {} response bytes", bytes_written);
279
280        let body_bytes_discarded = io::copy(&mut body, &mut io::sink()).await?;
281        log::trace!(
282            "discarded {} unread request body bytes",
283            body_bytes_discarded
284        );
285
286        if let Some(upgrade_sender) = upgrade_sender {
287            upgrade_sender.send(Connection::new(self.io.clone())).await;
288            Ok(ConnectionStatus::Close)
289        } else if close_connection {
290            Ok(ConnectionStatus::Close)
291        } else {
292            Ok(ConnectionStatus::KeepAlive)
293        }
294    }
295}