Skip to main content

io_http/
client.rs

1//! Standard, blocking HTTP/1.X client wrapping a boxed
2//! `Read + Write + Send` stream. Each [`HttpClientStd::send`] /
3//! [`HttpClientStd::send_http10`] is self-contained (HTTP has no
4//! session context). With a TLS feature enabled,
5//! [`HttpClientStd::connect`] opens `http://` / `https://` URLs
6//! end-to-end via [`pimalaya_stream::std::stream::StreamStd`].
7
8#[cfg(any(
9    feature = "rustls-aws",
10    feature = "rustls-ring",
11    feature = "native-tls"
12))]
13use alloc::string::ToString;
14use alloc::{boxed::Box, string::String, vec, vec::Vec};
15
16use std::io::{self, Read, Write};
17
18#[cfg(any(
19    feature = "rustls-aws",
20    feature = "rustls-ring",
21    feature = "native-tls"
22))]
23use pimalaya_stream::{std::stream::StreamStd, tls::Tls};
24use thiserror::Error;
25use url::Url;
26
27use crate::{
28    coroutine::*,
29    rfc1945::send::*,
30    rfc9110::{
31        headers::TRANSFER_ENCODING,
32        request::HttpRequest,
33        response::HttpResponse,
34        send::{HttpSendOutput, HttpSendYield},
35    },
36    rfc9112::{chunk_stream::*, read_headers::*, send::*},
37    sse::frame::*,
38};
39
40const READ_BUFFER_SIZE: usize = 16 * 1024;
41
42/// Default ALPN identifier for HTTPS connections: `http/1.1`
43/// ([RFC 7301] + IANA registry).
44///
45/// [RFC 7301]: https://www.rfc-editor.org/rfc/rfc7301
46pub fn default_alpn() -> Vec<String> {
47    vec![String::from("http/1.1")]
48}
49
50/// Errors returned by [`HttpClientStd`].
51#[derive(Debug, Error)]
52pub enum HttpClientStdError {
53    #[error(transparent)]
54    Http10Send(#[from] Http10SendError),
55    #[error(transparent)]
56    Http11Send(#[from] Http11SendError),
57
58    #[error(transparent)]
59    Io(#[from] io::Error),
60
61    #[cfg(any(
62        feature = "rustls-aws",
63        feature = "rustls-ring",
64        feature = "native-tls"
65    ))]
66    #[error(transparent)]
67    Tls(#[from] anyhow::Error),
68    #[cfg(any(
69        feature = "rustls-aws",
70        feature = "rustls-ring",
71        feature = "native-tls"
72    ))]
73    #[error("HTTP URL `{0}` has no host")]
74    UrlMissingHost(String),
75    #[cfg(any(
76        feature = "rustls-aws",
77        feature = "rustls-ring",
78        feature = "native-tls"
79    ))]
80    #[error("HTTP URL `{0}` has unsupported scheme `{1}` (expected `http` or `https`)")]
81    UrlUnsupportedScheme(String, String),
82
83    #[error("HTTP server redirected to `{url}` (status `{code}`)")]
84    UnexpectedRedirect { url: Url, code: u16 },
85
86    #[error("HTTP streaming requires `Transfer-Encoding: chunked` (got status `{0}`)")]
87    StreamingNotChunked(u16),
88    #[error(transparent)]
89    ChunkStream(#[from] Http11ReadChunksStreamError),
90}
91
92/// Std-blocking HTTP client wrapping a boxed `Read + Write + Send` stream.
93pub struct HttpClientStd {
94    stream: Box<dyn HttpStream>,
95}
96
97impl HttpClientStd {
98    /// Wraps a pre-connected stream; caller handles TCP and TLS.
99    pub fn new<S: Read + Write + Send + 'static>(stream: S) -> Self {
100        Self {
101            stream: Box::new(stream),
102        }
103    }
104
105    /// Connects to `url` (TLS handshake on `https`), reading ALPN from
106    /// `tls.rustls.alpn` (see [`default_alpn`]).
107    #[cfg(any(
108        feature = "rustls-aws",
109        feature = "rustls-ring",
110        feature = "native-tls"
111    ))]
112    pub fn connect(url: &Url, tls: &Tls) -> Result<Self, HttpClientStdError> {
113        let host = url
114            .host_str()
115            .ok_or_else(|| HttpClientStdError::UrlMissingHost(url.to_string()))?;
116
117        let stream = match url.scheme() {
118            "http" => StreamStd::connect_tcp(host, url.port_or_known_default().unwrap_or(80))?,
119            "https" => {
120                StreamStd::connect_tls(host, url.port_or_known_default().unwrap_or(443), tls)?
121            }
122            scheme => {
123                return Err(HttpClientStdError::UrlUnsupportedScheme(
124                    url.to_string(),
125                    scheme.to_string(),
126                ));
127            }
128        };
129
130        Ok(Self {
131            stream: Box::new(stream),
132        })
133    }
134
135    /// Replaces the underlying stream (e.g. after `Connection: close` or
136    /// a cross-authority redirect).
137    pub fn set_stream<S: Read + Write + Send + 'static>(&mut self, stream: S) {
138        self.stream = Box::new(stream);
139    }
140
141    /// Drives any standard-shape coroutine against the wrapped stream
142    /// until it completes. Coroutines with richer Yield variants
143    /// (`Http*Send`, `Http11ReadChunksStream`, `SseFrameParser`) use
144    /// their own per-method loops below.
145    pub fn run<C, T, E>(&mut self, mut coroutine: C) -> Result<T, HttpClientStdError>
146    where
147        C: HttpCoroutine<Yield = HttpYield, Return = Result<T, E>>,
148        HttpClientStdError: From<E>,
149    {
150        let mut buf = [0u8; READ_BUFFER_SIZE];
151        let mut arg: Option<&[u8]> = None;
152
153        loop {
154            match coroutine.resume(arg.take()) {
155                HttpCoroutineState::Complete(Ok(out)) => return Ok(out),
156                HttpCoroutineState::Complete(Err(err)) => return Err(err.into()),
157                HttpCoroutineState::Yielded(HttpYield::WantsRead) => {
158                    let n = self.stream.read(&mut buf)?;
159                    arg = Some(&buf[..n]);
160                }
161                HttpCoroutineState::Yielded(HttpYield::WantsWrite(bytes)) => {
162                    self.stream.write_all(&bytes)?;
163                    arg = None;
164                }
165            }
166        }
167    }
168
169    /// Runs [`Http11Send`]; surfaces 3xx as
170    /// [`HttpClientStdError::UnexpectedRedirect`].
171    pub fn send(&mut self, request: HttpRequest) -> Result<HttpSendOutput, HttpClientStdError> {
172        let mut coroutine = Http11Send::new(request);
173        let mut buf = [0u8; READ_BUFFER_SIZE];
174        let mut arg: Option<&[u8]> = None;
175
176        loop {
177            match coroutine.resume(arg.take()) {
178                HttpCoroutineState::Complete(Ok(out)) => return Ok(out),
179                HttpCoroutineState::Complete(Err(err)) => return Err(err.into()),
180                HttpCoroutineState::Yielded(HttpSendYield::WantsRead) => {
181                    let n = self.stream.read(&mut buf)?;
182                    arg = Some(&buf[..n]);
183                }
184                HttpCoroutineState::Yielded(HttpSendYield::WantsWrite(bytes)) => {
185                    self.stream.write_all(&bytes)?;
186                    arg = None;
187                }
188                HttpCoroutineState::Yielded(HttpSendYield::WantsRedirect {
189                    url, response, ..
190                }) => {
191                    return Err(HttpClientStdError::UnexpectedRedirect {
192                        url,
193                        code: *response.status,
194                    });
195                }
196            }
197        }
198    }
199
200    /// HTTP/1.0 counterpart of [`Self::send`].
201    pub fn send_http10(
202        &mut self,
203        request: HttpRequest,
204    ) -> Result<HttpSendOutput, HttpClientStdError> {
205        let mut coroutine = Http10Send::new(request);
206        let mut buf = [0u8; READ_BUFFER_SIZE];
207        let mut arg: Option<&[u8]> = None;
208
209        loop {
210            match coroutine.resume(arg.take()) {
211                HttpCoroutineState::Complete(Ok(out)) => return Ok(out),
212                HttpCoroutineState::Complete(Err(err)) => return Err(err.into()),
213                HttpCoroutineState::Yielded(HttpSendYield::WantsRead) => {
214                    let n = self.stream.read(&mut buf)?;
215                    arg = Some(&buf[..n]);
216                }
217                HttpCoroutineState::Yielded(HttpSendYield::WantsWrite(bytes)) => {
218                    self.stream.write_all(&bytes)?;
219                    arg = None;
220                }
221                HttpCoroutineState::Yielded(HttpSendYield::WantsRedirect {
222                    url, response, ..
223                }) => {
224                    return Err(HttpClientStdError::UnexpectedRedirect {
225                        url,
226                        code: *response.status,
227                    });
228                }
229            }
230        }
231    }
232}
233
234impl HttpClientStd {
235    /// Opens an HTTP/1.1 SSE stream; requires `Transfer-Encoding: chunked`.
236    /// Consumes `self` because the connection is dedicated to the stream.
237    pub fn send_streaming(self, request: HttpRequest) -> Result<SseStream, HttpClientStdError> {
238        let HttpClientStd { mut stream } = self;
239
240        let req_bytes = request.to_http_11_vec();
241        stream.write_all(&req_bytes)?;
242
243        let mut read_headers = Http11ReadHeaders::default();
244        let mut buf = [0u8; READ_BUFFER_SIZE];
245        let mut arg: Option<&[u8]> = None;
246
247        let out = loop {
248            match read_headers.resume(arg.take()) {
249                HttpCoroutineState::Complete(Ok(out)) => break out,
250                HttpCoroutineState::Complete(Err(err)) => {
251                    return Err(Http11SendError::from(err).into());
252                }
253                HttpCoroutineState::Yielded(HttpYield::WantsRead) => {
254                    let n = stream.read(&mut buf)?;
255                    if n == 0 {
256                        return Err(Http11SendError::Eof.into());
257                    }
258                    arg = Some(&buf[..n]);
259                }
260                HttpCoroutineState::Yielded(HttpYield::WantsWrite(_)) => {
261                    unreachable!("Http11ReadHeaders never writes");
262                }
263            }
264        };
265
266        let chunked = out
267            .response
268            .header(TRANSFER_ENCODING)
269            .is_some_and(|enc| enc.eq_ignore_ascii_case("chunked"));
270
271        if !chunked {
272            return Err(HttpClientStdError::StreamingNotChunked(
273                *out.response.status,
274            ));
275        }
276
277        Ok(SseStream {
278            stream,
279            chunk_stream: Http11ReadChunksStream::default(),
280            sse_parser: SseFrameParser::default(),
281            pending: None,
282            preread: out.remaining,
283            response: out.response,
284            keep_alive: out.keep_alive,
285            done: false,
286        })
287    }
288}
289
290/// Long-lived HTTP/1.1 Server-Sent Events stream; each
291/// [`SseStream::next_frame`] / [`Iterator::next`] blocks until the next
292/// event arrives or the connection closes.
293pub struct SseStream {
294    stream: Box<dyn HttpStream>,
295    chunk_stream: Http11ReadChunksStream,
296    sse_parser: SseFrameParser,
297    pending: Option<Vec<u8>>,
298    preread: Vec<u8>,
299    response: HttpResponse,
300    keep_alive: bool,
301    done: bool,
302}
303
304impl SseStream {
305    /// Parsed response headers (body is the streaming channel itself).
306    pub fn response(&self) -> &HttpResponse {
307        &self.response
308    }
309
310    /// Whether the server signalled the connection can be reused.
311    pub fn keep_alive(&self) -> bool {
312        self.keep_alive
313    }
314
315    /// Last-event-id seen so far; supply via `Last-Event-ID` on reconnect.
316    pub fn last_event_id(&self) -> Option<&str> {
317        self.sse_parser.last_event_id()
318    }
319
320    /// Drives chunked + SSE decoding until the next event; [`None`] on
321    /// connection close or zero-length chunk terminator.
322    pub fn next_frame(&mut self) -> Result<Option<SseFrame>, HttpClientStdError> {
323        if self.done {
324            return Ok(None);
325        }
326
327        loop {
328            let arg = self.pending.take();
329            match self.sse_parser.resume(arg.as_deref()) {
330                HttpCoroutineState::Yielded(SseFrameParserYield::Frame(frame)) => {
331                    return Ok(Some(frame));
332                }
333                HttpCoroutineState::Yielded(SseFrameParserYield::WantsBytes) => {
334                    match self.pull_chunk()? {
335                        Some(body) => self.pending = Some(body),
336                        None => {
337                            self.done = true;
338                            return Ok(None);
339                        }
340                    }
341                }
342                HttpCoroutineState::Complete(never) => match never {},
343            }
344        }
345    }
346
347    /// Closes the underlying connection (equivalent to dropping `self`).
348    pub fn close(self) {
349        drop(self);
350    }
351
352    fn pull_chunk(&mut self) -> Result<Option<Vec<u8>>, HttpClientStdError> {
353        let mut tmp = [0u8; READ_BUFFER_SIZE];
354        let preread = core::mem::take(&mut self.preread);
355        let mut arg: Option<&[u8]> = if preread.is_empty() {
356            None
357        } else {
358            Some(&preread)
359        };
360
361        loop {
362            match self.chunk_stream.resume(arg.take()) {
363                HttpCoroutineState::Yielded(Http11ReadChunksStreamYield::Frame { body }) => {
364                    return Ok(Some(body));
365                }
366                HttpCoroutineState::Complete(Ok(_remaining)) => return Ok(None),
367                HttpCoroutineState::Yielded(Http11ReadChunksStreamYield::WantsRead) => {
368                    let n = self.stream.read(&mut tmp)?;
369                    if n == 0 {
370                        return Ok(None);
371                    }
372                    arg = Some(&tmp[..n]);
373                }
374                HttpCoroutineState::Complete(Err(err)) => return Err(err.into()),
375            }
376        }
377    }
378}
379
380impl Iterator for SseStream {
381    type Item = Result<SseFrame, HttpClientStdError>;
382
383    fn next(&mut self) -> Option<Self::Item> {
384        match self.next_frame() {
385            Ok(Some(frame)) => Some(Ok(frame)),
386            Ok(None) => None,
387            Err(err) => Some(Err(err)),
388        }
389    }
390}
391
392// Marker for everything the client can run against; the `Send`
393// supertrait propagates through the `Box<dyn HttpStream>` erasure so
394// `HttpClientStd` stays `Send`.
395trait HttpStream: Read + Write + Send {}
396impl<T: Read + Write + Send + ?Sized> HttpStream for T {}