1#[cfg(any(
9 feature = "rustls-aws",
10 feature = "rustls-ring",
11 feature = "native-tls"
12))]
13use alloc::string::ToString;
14use alloc::{boxed::Box, string::String, vec, vec::Vec};
15
16use std::io::{self, Read, Write};
17
18#[cfg(any(
19 feature = "rustls-aws",
20 feature = "rustls-ring",
21 feature = "native-tls"
22))]
23use pimalaya_stream::{std::stream::StreamStd, tls::Tls};
24use thiserror::Error;
25use url::Url;
26
27use crate::{
28 coroutine::*,
29 rfc1945::send::*,
30 rfc9110::{
31 headers::TRANSFER_ENCODING,
32 request::HttpRequest,
33 response::HttpResponse,
34 send::{HttpSendOutput, HttpSendYield},
35 },
36 rfc9112::{chunk_stream::*, read_headers::*, send::*},
37 sse::frame::*,
38};
39
40const READ_BUFFER_SIZE: usize = 16 * 1024;
41
42pub fn default_alpn() -> Vec<String> {
47 vec![String::from("http/1.1")]
48}
49
50#[derive(Debug, Error)]
52pub enum HttpClientStdError {
53 #[error(transparent)]
54 Http10Send(#[from] Http10SendError),
55 #[error(transparent)]
56 Http11Send(#[from] Http11SendError),
57
58 #[error(transparent)]
59 Io(#[from] io::Error),
60
61 #[cfg(any(
62 feature = "rustls-aws",
63 feature = "rustls-ring",
64 feature = "native-tls"
65 ))]
66 #[error(transparent)]
67 Tls(#[from] anyhow::Error),
68 #[cfg(any(
69 feature = "rustls-aws",
70 feature = "rustls-ring",
71 feature = "native-tls"
72 ))]
73 #[error("HTTP URL `{0}` has no host")]
74 UrlMissingHost(String),
75 #[cfg(any(
76 feature = "rustls-aws",
77 feature = "rustls-ring",
78 feature = "native-tls"
79 ))]
80 #[error("HTTP URL `{0}` has unsupported scheme `{1}` (expected `http` or `https`)")]
81 UrlUnsupportedScheme(String, String),
82
83 #[error("HTTP server redirected to `{url}` (status `{code}`)")]
84 UnexpectedRedirect { url: Url, code: u16 },
85
86 #[error("HTTP streaming requires `Transfer-Encoding: chunked` (got status `{0}`)")]
87 StreamingNotChunked(u16),
88 #[error(transparent)]
89 ChunkStream(#[from] Http11ReadChunksStreamError),
90}
91
92pub struct HttpClientStd {
94 stream: Box<dyn HttpStream>,
95}
96
97impl HttpClientStd {
98 pub fn new<S: Read + Write + Send + 'static>(stream: S) -> Self {
100 Self {
101 stream: Box::new(stream),
102 }
103 }
104
105 #[cfg(any(
108 feature = "rustls-aws",
109 feature = "rustls-ring",
110 feature = "native-tls"
111 ))]
112 pub fn connect(url: &Url, tls: &Tls) -> Result<Self, HttpClientStdError> {
113 let host = url
114 .host_str()
115 .ok_or_else(|| HttpClientStdError::UrlMissingHost(url.to_string()))?;
116
117 let stream = match url.scheme() {
118 "http" => StreamStd::connect_tcp(host, url.port_or_known_default().unwrap_or(80))?,
119 "https" => {
120 StreamStd::connect_tls(host, url.port_or_known_default().unwrap_or(443), tls)?
121 }
122 scheme => {
123 return Err(HttpClientStdError::UrlUnsupportedScheme(
124 url.to_string(),
125 scheme.to_string(),
126 ));
127 }
128 };
129
130 Ok(Self {
131 stream: Box::new(stream),
132 })
133 }
134
135 pub fn set_stream<S: Read + Write + Send + 'static>(&mut self, stream: S) {
138 self.stream = Box::new(stream);
139 }
140
141 pub fn run<C, T, E>(&mut self, mut coroutine: C) -> Result<T, HttpClientStdError>
146 where
147 C: HttpCoroutine<Yield = HttpYield, Return = Result<T, E>>,
148 HttpClientStdError: From<E>,
149 {
150 let mut buf = [0u8; READ_BUFFER_SIZE];
151 let mut arg: Option<&[u8]> = None;
152
153 loop {
154 match coroutine.resume(arg.take()) {
155 HttpCoroutineState::Complete(Ok(out)) => return Ok(out),
156 HttpCoroutineState::Complete(Err(err)) => return Err(err.into()),
157 HttpCoroutineState::Yielded(HttpYield::WantsRead) => {
158 let n = self.stream.read(&mut buf)?;
159 arg = Some(&buf[..n]);
160 }
161 HttpCoroutineState::Yielded(HttpYield::WantsWrite(bytes)) => {
162 self.stream.write_all(&bytes)?;
163 arg = None;
164 }
165 }
166 }
167 }
168
169 pub fn send(&mut self, request: HttpRequest) -> Result<HttpSendOutput, HttpClientStdError> {
172 let mut coroutine = Http11Send::new(request);
173 let mut buf = [0u8; READ_BUFFER_SIZE];
174 let mut arg: Option<&[u8]> = None;
175
176 loop {
177 match coroutine.resume(arg.take()) {
178 HttpCoroutineState::Complete(Ok(out)) => return Ok(out),
179 HttpCoroutineState::Complete(Err(err)) => return Err(err.into()),
180 HttpCoroutineState::Yielded(HttpSendYield::WantsRead) => {
181 let n = self.stream.read(&mut buf)?;
182 arg = Some(&buf[..n]);
183 }
184 HttpCoroutineState::Yielded(HttpSendYield::WantsWrite(bytes)) => {
185 self.stream.write_all(&bytes)?;
186 arg = None;
187 }
188 HttpCoroutineState::Yielded(HttpSendYield::WantsRedirect {
189 url, response, ..
190 }) => {
191 return Err(HttpClientStdError::UnexpectedRedirect {
192 url,
193 code: *response.status,
194 });
195 }
196 }
197 }
198 }
199
200 pub fn send_http10(
202 &mut self,
203 request: HttpRequest,
204 ) -> Result<HttpSendOutput, HttpClientStdError> {
205 let mut coroutine = Http10Send::new(request);
206 let mut buf = [0u8; READ_BUFFER_SIZE];
207 let mut arg: Option<&[u8]> = None;
208
209 loop {
210 match coroutine.resume(arg.take()) {
211 HttpCoroutineState::Complete(Ok(out)) => return Ok(out),
212 HttpCoroutineState::Complete(Err(err)) => return Err(err.into()),
213 HttpCoroutineState::Yielded(HttpSendYield::WantsRead) => {
214 let n = self.stream.read(&mut buf)?;
215 arg = Some(&buf[..n]);
216 }
217 HttpCoroutineState::Yielded(HttpSendYield::WantsWrite(bytes)) => {
218 self.stream.write_all(&bytes)?;
219 arg = None;
220 }
221 HttpCoroutineState::Yielded(HttpSendYield::WantsRedirect {
222 url, response, ..
223 }) => {
224 return Err(HttpClientStdError::UnexpectedRedirect {
225 url,
226 code: *response.status,
227 });
228 }
229 }
230 }
231 }
232}
233
234impl HttpClientStd {
235 pub fn send_streaming(self, request: HttpRequest) -> Result<SseStream, HttpClientStdError> {
238 let HttpClientStd { mut stream } = self;
239
240 let req_bytes = request.to_http_11_vec();
241 stream.write_all(&req_bytes)?;
242
243 let mut read_headers = Http11ReadHeaders::default();
244 let mut buf = [0u8; READ_BUFFER_SIZE];
245 let mut arg: Option<&[u8]> = None;
246
247 let out = loop {
248 match read_headers.resume(arg.take()) {
249 HttpCoroutineState::Complete(Ok(out)) => break out,
250 HttpCoroutineState::Complete(Err(err)) => {
251 return Err(Http11SendError::from(err).into());
252 }
253 HttpCoroutineState::Yielded(HttpYield::WantsRead) => {
254 let n = stream.read(&mut buf)?;
255 if n == 0 {
256 return Err(Http11SendError::Eof.into());
257 }
258 arg = Some(&buf[..n]);
259 }
260 HttpCoroutineState::Yielded(HttpYield::WantsWrite(_)) => {
261 unreachable!("Http11ReadHeaders never writes");
262 }
263 }
264 };
265
266 let chunked = out
267 .response
268 .header(TRANSFER_ENCODING)
269 .is_some_and(|enc| enc.eq_ignore_ascii_case("chunked"));
270
271 if !chunked {
272 return Err(HttpClientStdError::StreamingNotChunked(
273 *out.response.status,
274 ));
275 }
276
277 Ok(SseStream {
278 stream,
279 chunk_stream: Http11ReadChunksStream::default(),
280 sse_parser: SseFrameParser::default(),
281 pending: None,
282 preread: out.remaining,
283 response: out.response,
284 keep_alive: out.keep_alive,
285 done: false,
286 })
287 }
288}
289
290pub struct SseStream {
294 stream: Box<dyn HttpStream>,
295 chunk_stream: Http11ReadChunksStream,
296 sse_parser: SseFrameParser,
297 pending: Option<Vec<u8>>,
298 preread: Vec<u8>,
299 response: HttpResponse,
300 keep_alive: bool,
301 done: bool,
302}
303
304impl SseStream {
305 pub fn response(&self) -> &HttpResponse {
307 &self.response
308 }
309
310 pub fn keep_alive(&self) -> bool {
312 self.keep_alive
313 }
314
315 pub fn last_event_id(&self) -> Option<&str> {
317 self.sse_parser.last_event_id()
318 }
319
320 pub fn next_frame(&mut self) -> Result<Option<SseFrame>, HttpClientStdError> {
323 if self.done {
324 return Ok(None);
325 }
326
327 loop {
328 let arg = self.pending.take();
329 match self.sse_parser.resume(arg.as_deref()) {
330 HttpCoroutineState::Yielded(SseFrameParserYield::Frame(frame)) => {
331 return Ok(Some(frame));
332 }
333 HttpCoroutineState::Yielded(SseFrameParserYield::WantsBytes) => {
334 match self.pull_chunk()? {
335 Some(body) => self.pending = Some(body),
336 None => {
337 self.done = true;
338 return Ok(None);
339 }
340 }
341 }
342 HttpCoroutineState::Complete(never) => match never {},
343 }
344 }
345 }
346
347 pub fn close(self) {
349 drop(self);
350 }
351
352 fn pull_chunk(&mut self) -> Result<Option<Vec<u8>>, HttpClientStdError> {
353 let mut tmp = [0u8; READ_BUFFER_SIZE];
354 let preread = core::mem::take(&mut self.preread);
355 let mut arg: Option<&[u8]> = if preread.is_empty() {
356 None
357 } else {
358 Some(&preread)
359 };
360
361 loop {
362 match self.chunk_stream.resume(arg.take()) {
363 HttpCoroutineState::Yielded(Http11ReadChunksStreamYield::Frame { body }) => {
364 return Ok(Some(body));
365 }
366 HttpCoroutineState::Complete(Ok(_remaining)) => return Ok(None),
367 HttpCoroutineState::Yielded(Http11ReadChunksStreamYield::WantsRead) => {
368 let n = self.stream.read(&mut tmp)?;
369 if n == 0 {
370 return Ok(None);
371 }
372 arg = Some(&tmp[..n]);
373 }
374 HttpCoroutineState::Complete(Err(err)) => return Err(err.into()),
375 }
376 }
377 }
378}
379
380impl Iterator for SseStream {
381 type Item = Result<SseFrame, HttpClientStdError>;
382
383 fn next(&mut self) -> Option<Self::Item> {
384 match self.next_frame() {
385 Ok(Some(frame)) => Some(Ok(frame)),
386 Ok(None) => None,
387 Err(err) => Some(Err(err)),
388 }
389 }
390}
391
392trait HttpStream: Read + Write + Send {}
396impl<T: Read + Write + Send + ?Sized> HttpStream for T {}