Skip to main content

io_http/rfc9112/
send.rs

1//! I/O-free coroutine sending an HTTP/1.1 request and receiving its
2//! response ([RFC 9112]).
3//!
4//! Serialises the request, drives the read/parse cycle through
5//! [`Http11ReadHeaders`] for the head and selects a body-reading
6//! strategy from the response headers:
7//!
8//! | Strategy     | Trigger                      |
9//! |--------------|------------------------------|
10//! | Chunked      | `Transfer-Encoding: chunked` |
11//! | Fixed-length | `Content-Length: <n>`        |
12//! | Read-to-EOF  | Neither header present       |
13//!
14//! # Example
15//!
16//! ```rust,no_run
17//! use std::{io::{Read, Write}, net::TcpStream};
18//!
19//! use io_http::{
20//!     coroutine::*,
21//!     rfc9110::{request::HttpRequest, send::HttpSendYield},
22//!     rfc9112::send::Http11Send,
23//! };
24//! use url::Url;
25//!
26//! let url = Url::parse("http://example.com/").unwrap();
27//! let request = HttpRequest::get(url.clone())
28//!     .header("Host", url.host_str().unwrap())
29//!     .header("Connection", "close");
30//!
31//! let mut stream = TcpStream::connect("example.com:80").unwrap();
32//! let mut send = Http11Send::new(request);
33//! let mut arg: Option<&[u8]> = None;
34//! let mut buf = [0u8; 4096];
35//!
36//! let (response, keep_alive) = loop {
37//!     match send.resume(arg.take()) {
38//!         HttpCoroutineState::Complete(Ok(out)) => break (out.response, out.keep_alive),
39//!         HttpCoroutineState::Complete(Err(err)) => panic!("{err}"),
40//!         HttpCoroutineState::Yielded(HttpSendYield::WantsRead) => {
41//!             let n = stream.read(&mut buf).unwrap();
42//!             arg = Some(&buf[..n]);
43//!         }
44//!         HttpCoroutineState::Yielded(HttpSendYield::WantsWrite(bytes)) => {
45//!             stream.write_all(&bytes).unwrap();
46//!         }
47//!         HttpCoroutineState::Yielded(HttpSendYield::WantsRedirect { url, .. }) => {
48//!             panic!("redirect to {url}");
49//!         }
50//!     }
51//! };
52//!
53//! println!("{}", *response.status);
54//! # let _ = keep_alive;
55//! ```
56//!
57//! [RFC 9112]: https://www.rfc-editor.org/rfc/rfc9112
58
59use core::{fmt, mem};
60
61use alloc::{borrow::ToOwned, string::String, vec::Vec};
62
63use httparse::Error as HttparseError;
64use log::trace;
65use thiserror::Error;
66use url::Url;
67
68use crate::{
69    coroutine::*,
70    http_try,
71    rfc1945::version::HTTP_10,
72    rfc9110::{
73        headers::{CONTENT_LENGTH, LOCATION, TRANSFER_ENCODING},
74        request::HttpRequest,
75        response::HttpResponse,
76        send::{HttpSendOutput, HttpSendYield},
77    },
78    rfc9112::{
79        chunk::{Http11ReadChunks, Http11ReadChunksError},
80        read_headers::{Http11ReadHeaders, Http11ReadHeadersError},
81    },
82};
83
84/// Failure causes during the HTTP/1.1 send flow.
85#[derive(Debug, Error)]
86pub enum Http11SendError {
87    #[error("HTTP/1.1 send failed: reached unexpected EOF")]
88    Eof,
89    #[error("HTTP/1.1 send failed: parse response headers: {0}")]
90    ParseResponseHeaders(HttparseError),
91    #[error("HTTP/1.1 send failed: invalid content length `{0}`")]
92    InvalidContentLength(String),
93    #[error("HTTP/1.1 send failed: {0}")]
94    ReadChunks(#[from] Http11ReadChunksError),
95}
96
97impl From<Http11ReadHeadersError> for Http11SendError {
98    fn from(err: Http11ReadHeadersError) -> Self {
99        match err {
100            Http11ReadHeadersError::Eof => Self::Eof,
101            Http11ReadHeadersError::ParseResponseHeaders(e) => Self::ParseResponseHeaders(e),
102        }
103    }
104}
105
106#[derive(Debug)]
107enum State {
108    ReadHeaders(Http11ReadHeaders),
109    BodyChunks(Http11ReadChunks),
110    BodyLength(usize),
111    BodyEof,
112}
113
114impl fmt::Display for State {
115    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116        match self {
117            Self::ReadHeaders(_) => f.write_str("read headers"),
118            Self::BodyChunks(_) => f.write_str("read body chunks"),
119            Self::BodyLength(_) => f.write_str("read body length"),
120            Self::BodyEof => f.write_str("read body until eof"),
121        }
122    }
123}
124
125/// I/O-free coroutine to send an HTTP/1.1 request and receive its response.
126#[derive(Debug)]
127pub struct Http11Send {
128    request_url: Url,
129    state: State,
130    wants_write: Option<Vec<u8>>,
131    is_conn_closed: bool,
132    response: Option<HttpResponse>,
133    buf: Vec<u8>,
134}
135
136impl Http11Send {
137    /// Creates a new coroutine that will send the given request and
138    /// receive its response.
139    pub fn new(req: HttpRequest) -> Self {
140        trace!("prepares HTTP/1.1 request to be sent: {req:?}");
141
142        let request_url = req.url.clone();
143        let bytes = req.to_http_11_vec();
144
145        Self {
146            request_url,
147            state: State::ReadHeaders(Http11ReadHeaders::default()),
148            wants_write: Some(bytes),
149            is_conn_closed: false,
150            response: None,
151            buf: Vec::new(),
152        }
153    }
154
155    fn finish(
156        &self,
157        response: HttpResponse,
158        remaining: Vec<u8>,
159    ) -> HttpCoroutineState<HttpSendYield, Result<HttpSendOutput, Http11SendError>> {
160        let keep_alive = !self.is_conn_closed;
161
162        if response.status.is_redirection() {
163            if let Some(location) = response.header(LOCATION) {
164                if let Ok(url) = self.request_url.join(location) {
165                    let same_scheme = self.request_url.scheme() == url.scheme();
166                    let same_host = self.request_url.host() == url.host()
167                        && self.request_url.port() == url.port();
168                    let same_origin = same_scheme && same_host;
169
170                    return HttpCoroutineState::Yielded(HttpSendYield::WantsRedirect {
171                        url,
172                        response,
173                        keep_alive,
174                        same_origin,
175                    });
176                }
177            }
178        }
179
180        HttpCoroutineState::Complete(Ok(HttpSendOutput {
181            response,
182            remaining,
183            keep_alive,
184        }))
185    }
186}
187
188impl HttpCoroutine for Http11Send {
189    type Yield = HttpSendYield;
190    type Return = Result<HttpSendOutput, Http11SendError>;
191
192    fn resume(&mut self, mut arg: Option<&[u8]>) -> HttpCoroutineState<Self::Yield, Self::Return> {
193        loop {
194            trace!("http/1.1 send: {}", self.state);
195
196            if let Some(bytes) = self.wants_write.take() {
197                return HttpCoroutineState::Yielded(HttpSendYield::WantsWrite(bytes));
198            }
199
200            match &mut self.state {
201                State::ReadHeaders(rh) => match rh.resume(arg.take()) {
202                    HttpCoroutineState::Yielded(HttpYield::WantsRead) => {
203                        return HttpCoroutineState::Yielded(HttpSendYield::WantsRead);
204                    }
205                    HttpCoroutineState::Yielded(HttpYield::WantsWrite(_)) => {
206                        unreachable!("Http11ReadHeaders never writes");
207                    }
208                    HttpCoroutineState::Complete(Err(err)) => {
209                        return HttpCoroutineState::Complete(Err(err.into()));
210                    }
211                    HttpCoroutineState::Complete(Ok(out)) => {
212                        let mut response = out.response;
213                        let is_http10 = response.version == HTTP_10;
214                        self.is_conn_closed = !out.keep_alive;
215                        let status = *response.status;
216
217                        if status == 204 || status == 304 {
218                            return self.finish(response, out.remaining);
219                        }
220
221                        if !is_http10 {
222                            let chunked = response
223                                .header(TRANSFER_ENCODING)
224                                .is_some_and(|enc| enc.eq_ignore_ascii_case("chunked"));
225                            if chunked {
226                                let mut chunks = Http11ReadChunks::default();
227                                match chunks.resume(Some(&out.remaining)) {
228                                    HttpCoroutineState::Complete(Ok(chunk_out)) => {
229                                        response.body = chunk_out.body;
230                                        return self.finish(response, chunk_out.remaining);
231                                    }
232                                    HttpCoroutineState::Yielded(HttpYield::WantsRead) => {
233                                        self.response = Some(response);
234                                        self.state = State::BodyChunks(chunks);
235                                        return HttpCoroutineState::Yielded(
236                                            HttpSendYield::WantsRead,
237                                        );
238                                    }
239                                    HttpCoroutineState::Yielded(HttpYield::WantsWrite(_)) => {
240                                        unreachable!("Http11ReadChunks never writes");
241                                    }
242                                    HttpCoroutineState::Complete(Err(err)) => {
243                                        return HttpCoroutineState::Complete(Err(err.into()));
244                                    }
245                                }
246                            }
247                        }
248
249                        if let Some(len_str) = response.header(CONTENT_LENGTH) {
250                            let len_str = len_str.trim();
251                            let Ok(len) = len_str.parse::<usize>() else {
252                                let err = Http11SendError::InvalidContentLength(len_str.to_owned());
253                                return HttpCoroutineState::Complete(Err(err));
254                            };
255                            self.buf = out.remaining;
256                            self.response = Some(response);
257                            self.state = State::BodyLength(len);
258                            continue;
259                        }
260
261                        self.buf = out.remaining;
262                        self.response = Some(response);
263                        self.state = State::BodyEof;
264                    }
265                },
266                State::BodyChunks(chunks) => {
267                    let chunk_out = http_try!(chunks, arg.take());
268                    let mut response = self.response.take().expect("response missing");
269                    response.body = chunk_out.body;
270                    return self.finish(response, chunk_out.remaining);
271                }
272                State::BodyLength(len) => {
273                    if let Some(data) = arg.take() {
274                        self.buf.extend_from_slice(data);
275                    }
276
277                    if *len > self.buf.len() {
278                        trace!("received incomplete body {len}/{}", self.buf.len());
279                        return HttpCoroutineState::Yielded(HttpSendYield::WantsRead);
280                    }
281
282                    let body = self.buf.drain(..*len).collect();
283                    let remaining = mem::take(&mut self.buf);
284                    let mut response = self.response.take().expect("response missing");
285                    response.body = body;
286                    return self.finish(response, remaining);
287                }
288                State::BodyEof => match arg.take() {
289                    Some(&[]) => {
290                        let buf = mem::take(&mut self.buf);
291                        let mut response = self.response.take().expect("response missing");
292                        response.body = buf;
293                        return self.finish(response, Vec::new());
294                    }
295                    Some(data) => {
296                        self.buf.extend_from_slice(data);
297                        return HttpCoroutineState::Yielded(HttpSendYield::WantsRead);
298                    }
299                    None => {
300                        return HttpCoroutineState::Yielded(HttpSendYield::WantsRead);
301                    }
302                },
303            }
304        }
305    }
306}
307
308#[cfg(test)]
309mod tests {
310    use super::*;
311
312    #[test]
313    fn body_chunks_completes() {
314        let req = HttpRequest::get("https://example.com".try_into().unwrap());
315        let mut coroutine = Http11Send::new(req);
316
317        let bytes = expect_wants_write(&mut coroutine, None);
318        assert_eq!(bytes, b"GET / HTTP/1.1\r\ncontent-length: 0\r\n\r\n");
319
320        expect_wants_read(&mut coroutine, None);
321
322        let reply = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n5\r\nhello\r\n6\r\n world\r\n0\r\n\r\n";
323        let out = expect_complete_ok(&mut coroutine, Some(reply));
324        assert_eq!(out.response.version, "HTTP/1.1");
325        assert_eq!(*out.response.status, 200);
326        assert_eq!(out.response.body, b"hello world");
327        assert!(out.remaining.is_empty());
328        assert!(out.keep_alive);
329    }
330
331    #[test]
332    fn body_length_completes() {
333        let req = HttpRequest::get("https://example.com".try_into().unwrap());
334        let mut coroutine = Http11Send::new(req);
335
336        expect_wants_write(&mut coroutine, None);
337        expect_wants_read(&mut coroutine, None);
338
339        let reply = b"HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nhello";
340        let out = expect_complete_ok(&mut coroutine, Some(reply));
341        assert_eq!(*out.response.status, 200);
342        assert_eq!(out.response.body, b"hello");
343        assert!(out.keep_alive);
344    }
345
346    #[test]
347    fn body_eof_completes() {
348        let req = HttpRequest::get("https://example.com".try_into().unwrap());
349        let mut coroutine = Http11Send::new(req);
350
351        expect_wants_write(&mut coroutine, None);
352        expect_wants_read(&mut coroutine, None);
353        expect_wants_read(&mut coroutine, Some(b"HTTP/1.1 200 OK\r\n\r\nhello "));
354        expect_wants_read(&mut coroutine, Some(b"world"));
355
356        let out = expect_complete_ok(&mut coroutine, Some(b""));
357        assert_eq!(out.response.body, b"hello world");
358    }
359
360    #[test]
361    fn invalid_content_length_errors() {
362        let req = HttpRequest::get("https://example.com".try_into().unwrap());
363        let mut coroutine = Http11Send::new(req);
364
365        expect_wants_write(&mut coroutine, None);
366        expect_wants_read(&mut coroutine, None);
367
368        let reply = b"HTTP/1.1 200 OK\r\nContent-Length: notanumber\r\n\r\n";
369        let err = expect_complete_err(&mut coroutine, Some(reply));
370        let Http11SendError::InvalidContentLength(s) = err else {
371            panic!("expected InvalidContentLength, got {err:?}");
372        };
373        assert_eq!(s, "notanumber");
374    }
375
376    #[test]
377    fn redirect_yields_wants_redirect() {
378        let req = HttpRequest::get("http://example.com/old".try_into().unwrap());
379        let mut coroutine = Http11Send::new(req);
380
381        expect_wants_write(&mut coroutine, None);
382        expect_wants_read(&mut coroutine, None);
383
384        let reply =
385            b"HTTP/1.1 301 Moved Permanently\r\nLocation: /new\r\nContent-Length: 0\r\n\r\n";
386        match coroutine.resume(Some(reply)) {
387            HttpCoroutineState::Yielded(HttpSendYield::WantsRedirect {
388                url,
389                same_origin,
390                keep_alive,
391                ..
392            }) => {
393                assert_eq!(url.path(), "/new");
394                assert!(same_origin);
395                assert!(keep_alive);
396            }
397            state => panic!("expected WantsRedirect, got {state:?}"),
398        }
399    }
400
401    // --- utils
402
403    fn expect_wants_write(cor: &mut Http11Send, arg: Option<&[u8]>) -> Vec<u8> {
404        match cor.resume(arg) {
405            HttpCoroutineState::Yielded(HttpSendYield::WantsWrite(bytes)) => bytes,
406            state => panic!("expected WantsWrite, got {state:?}"),
407        }
408    }
409
410    fn expect_wants_read(cor: &mut Http11Send, arg: Option<&[u8]>) {
411        match cor.resume(arg) {
412            HttpCoroutineState::Yielded(HttpSendYield::WantsRead) => {}
413            state => panic!("expected WantsRead, got {state:?}"),
414        }
415    }
416
417    fn expect_complete_ok(cor: &mut Http11Send, arg: Option<&[u8]>) -> HttpSendOutput {
418        match cor.resume(arg) {
419            HttpCoroutineState::Complete(Ok(out)) => out,
420            state => panic!("expected Complete(Ok), got {state:?}"),
421        }
422    }
423
424    fn expect_complete_err(cor: &mut Http11Send, arg: Option<&[u8]>) -> Http11SendError {
425        match cor.resume(arg) {
426            HttpCoroutineState::Complete(Err(err)) => err,
427            state => panic!("expected Complete(Err), got {state:?}"),
428        }
429    }
430}