edge_http/
io.rs

1use core::cmp::min;
2use core::fmt::{Display, Write as _};
3use core::str;
4
5use embedded_io_async::{ErrorType, Read, Write};
6
7use httparse::Status;
8
9use crate::ws::UpgradeError;
10use crate::{
11    BodyType, ConnectionType, Headers, HeadersMismatchError, Method, RequestHeaders,
12    ResponseHeaders,
13};
14
15pub mod client;
16pub mod server;
17
18/// An error in parsing the headers or the body.
19#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
20pub enum Error<E> {
21    InvalidHeaders,
22    InvalidBody,
23    TooManyHeaders,
24    TooLongHeaders,
25    TooLongBody,
26    IncompleteHeaders,
27    IncompleteBody,
28    InvalidState,
29    ConnectionClosed,
30    HeadersMismatchError(HeadersMismatchError),
31    WsUpgradeError(UpgradeError),
32    Io(E),
33}
34
35pub type ErrorKind = Error<edge_nal::io::ErrorKind>;
36
37impl<E> Error<E>
38where
39    E: edge_nal::io::Error,
40{
41    pub fn erase(&self) -> Error<edge_nal::io::ErrorKind> {
42        match self {
43            Self::InvalidHeaders => Error::InvalidHeaders,
44            Self::InvalidBody => Error::InvalidBody,
45            Self::TooManyHeaders => Error::TooManyHeaders,
46            Self::TooLongHeaders => Error::TooLongHeaders,
47            Self::TooLongBody => Error::TooLongBody,
48            Self::IncompleteHeaders => Error::IncompleteHeaders,
49            Self::IncompleteBody => Error::IncompleteBody,
50            Self::InvalidState => Error::InvalidState,
51            Self::ConnectionClosed => Error::ConnectionClosed,
52            Self::HeadersMismatchError(e) => Error::HeadersMismatchError(*e),
53            Self::WsUpgradeError(e) => Error::WsUpgradeError(*e),
54            Self::Io(e) => Error::Io(e.kind()),
55        }
56    }
57}
58
59impl<E> From<httparse::Error> for Error<E> {
60    fn from(e: httparse::Error) -> Self {
61        match e {
62            httparse::Error::HeaderName => Self::InvalidHeaders,
63            httparse::Error::HeaderValue => Self::InvalidHeaders,
64            httparse::Error::NewLine => Self::InvalidHeaders,
65            httparse::Error::Status => Self::InvalidHeaders,
66            httparse::Error::Token => Self::InvalidHeaders,
67            httparse::Error::TooManyHeaders => Self::TooManyHeaders,
68            httparse::Error::Version => Self::InvalidHeaders,
69        }
70    }
71}
72
73impl<E> From<HeadersMismatchError> for Error<E> {
74    fn from(e: HeadersMismatchError) -> Self {
75        Self::HeadersMismatchError(e)
76    }
77}
78
79impl<E> From<UpgradeError> for Error<E> {
80    fn from(e: UpgradeError) -> Self {
81        Self::WsUpgradeError(e)
82    }
83}
84
85impl<E> embedded_io_async::Error for Error<E>
86where
87    E: embedded_io_async::Error,
88{
89    fn kind(&self) -> embedded_io_async::ErrorKind {
90        match self {
91            Self::Io(e) => e.kind(),
92            _ => embedded_io_async::ErrorKind::Other,
93        }
94    }
95}
96
97impl<E> Display for Error<E>
98where
99    E: Display,
100{
101    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
102        match self {
103            Self::InvalidHeaders => write!(f, "Invalid HTTP headers or status line"),
104            Self::InvalidBody => write!(f, "Invalid HTTP body"),
105            Self::TooManyHeaders => write!(f, "Too many HTTP headers"),
106            Self::TooLongHeaders => write!(f, "HTTP headers section is too long"),
107            Self::TooLongBody => write!(f, "HTTP body is too long"),
108            Self::IncompleteHeaders => write!(f, "HTTP headers section is incomplete"),
109            Self::IncompleteBody => write!(f, "HTTP body is incomplete"),
110            Self::InvalidState => write!(f, "Connection is not in requested state"),
111            Self::HeadersMismatchError(e) => write!(f, "Headers mismatch: {e}"),
112            Self::WsUpgradeError(e) => write!(f, "WebSocket upgrade error: {e}"),
113            Self::ConnectionClosed => write!(f, "Connection closed"),
114            Self::Io(e) => write!(f, "{e}"),
115        }
116    }
117}
118
119#[cfg(feature = "defmt")]
120impl<E> defmt::Format for Error<E>
121where
122    E: defmt::Format,
123{
124    fn format(&self, f: defmt::Formatter<'_>) {
125        match self {
126            Self::InvalidHeaders => defmt::write!(f, "Invalid HTTP headers or status line"),
127            Self::InvalidBody => defmt::write!(f, "Invalid HTTP body"),
128            Self::TooManyHeaders => defmt::write!(f, "Too many HTTP headers"),
129            Self::TooLongHeaders => defmt::write!(f, "HTTP headers section is too long"),
130            Self::TooLongBody => defmt::write!(f, "HTTP body is too long"),
131            Self::IncompleteHeaders => defmt::write!(f, "HTTP headers section is incomplete"),
132            Self::IncompleteBody => defmt::write!(f, "HTTP body is incomplete"),
133            Self::InvalidState => defmt::write!(f, "Connection is not in requested state"),
134            Self::HeadersMismatchError(e) => defmt::write!(f, "Headers mismatch: {}", e),
135            Self::WsUpgradeError(e) => defmt::write!(f, "WebSocket upgrade error: {}", e),
136            Self::ConnectionClosed => defmt::write!(f, "Connection closed"),
137            Self::Io(e) => defmt::write!(f, "{}", e),
138        }
139    }
140}
141
142#[cfg(feature = "std")]
143impl<E> std::error::Error for Error<E> where E: std::error::Error {}
144
145impl<'b, const N: usize> RequestHeaders<'b, N> {
146    /// Parse the headers from the input stream
147    pub async fn receive<R>(
148        &mut self,
149        buf: &'b mut [u8],
150        mut input: R,
151        exact: bool,
152    ) -> Result<(&'b mut [u8], usize), Error<R::Error>>
153    where
154        R: Read,
155    {
156        let (read_len, headers_len) =
157            match raw::read_reply_buf::<N, _>(&mut input, buf, true, exact).await {
158                Ok(read_len) => read_len,
159                Err(e) => return Err(e),
160            };
161
162        let mut parser = httparse::Request::new(&mut self.headers.0);
163
164        let (headers_buf, body_buf) = buf.split_at_mut(headers_len);
165
166        let status = match parser.parse(headers_buf) {
167            Ok(status) => status,
168            Err(e) => return Err(e.into()),
169        };
170
171        if let Status::Complete(headers_len2) = status {
172            if headers_len != headers_len2 {
173                unreachable!("Should not happen. HTTP header parsing is indeterminate.")
174            }
175
176            self.http11 = match parser.version {
177                Some(0) => false,
178                Some(1) => true,
179                _ => Err(Error::InvalidHeaders)?,
180            };
181
182            let method_str = parser.method.ok_or(Error::InvalidHeaders)?;
183            self.method = Method::new(method_str).ok_or(Error::InvalidHeaders)?;
184            self.path = parser.path.ok_or(Error::InvalidHeaders)?;
185
186            trace!("Received:\n{}", self);
187
188            Ok((body_buf, read_len - headers_len))
189        } else {
190            unreachable!("Secondary parse of already loaded buffer failed.")
191        }
192    }
193
194    /// Resolve the connection type and body type from the headers
195    pub fn resolve<E>(&self) -> Result<(ConnectionType, BodyType), Error<E>> {
196        self.headers.resolve::<E>(None, true, self.http11)
197    }
198
199    /// Send the headers to the output stream, returning the connection type and body type
200    pub async fn send<W>(
201        &self,
202        chunked_if_unspecified: bool,
203        mut output: W,
204    ) -> Result<(ConnectionType, BodyType), Error<W::Error>>
205    where
206        W: Write,
207    {
208        send_request(self.http11, self.method, self.path, &mut output).await?;
209
210        self.headers
211            .send(None, true, self.http11, chunked_if_unspecified, output)
212            .await
213    }
214}
215
216impl<'b, const N: usize> ResponseHeaders<'b, N> {
217    /// Parse the headers from the input stream
218    pub async fn receive<R>(
219        &mut self,
220        buf: &'b mut [u8],
221        mut input: R,
222        exact: bool,
223    ) -> Result<(&'b mut [u8], usize), Error<R::Error>>
224    where
225        R: Read,
226    {
227        let (read_len, headers_len) =
228            raw::read_reply_buf::<N, _>(&mut input, buf, false, exact).await?;
229
230        let mut parser = httparse::Response::new(&mut self.headers.0);
231
232        let (headers_buf, body_buf) = buf.split_at_mut(headers_len);
233
234        let status = parser.parse(headers_buf).map_err(Error::from)?;
235
236        if let Status::Complete(headers_len2) = status {
237            if headers_len != headers_len2 {
238                unreachable!("Should not happen. HTTP header parsing is indeterminate.")
239            }
240
241            self.http11 = match parser.version {
242                Some(0) => false,
243                Some(1) => true,
244                _ => Err(Error::InvalidHeaders)?,
245            };
246
247            self.code = parser.code.ok_or(Error::InvalidHeaders)?;
248            self.reason = parser.reason;
249
250            trace!("Received:\n{}", self);
251
252            Ok((body_buf, read_len - headers_len))
253        } else {
254            unreachable!("Secondary parse of already loaded buffer failed.")
255        }
256    }
257
258    /// Resolve the connection type and body type from the headers
259    pub fn resolve<E>(
260        &self,
261        request_connection_type: ConnectionType,
262    ) -> Result<(ConnectionType, BodyType), Error<E>> {
263        self.headers
264            .resolve::<E>(Some(request_connection_type), false, self.http11)
265    }
266
267    /// Send the headers to the output stream, returning the connection type and body type
268    pub async fn send<W>(
269        &self,
270        request_connection_type: ConnectionType,
271        chunked_if_unspecified: bool,
272        mut output: W,
273    ) -> Result<(ConnectionType, BodyType), Error<W::Error>>
274    where
275        W: Write,
276    {
277        send_status(self.http11, self.code, self.reason, &mut output).await?;
278
279        self.headers
280            .send(
281                Some(request_connection_type),
282                false,
283                self.http11,
284                chunked_if_unspecified,
285                output,
286            )
287            .await
288    }
289}
290
291pub(crate) async fn send_request<W>(
292    http11: bool,
293    method: Method,
294    path: &str,
295    mut output: W,
296) -> Result<(), Error<W::Error>>
297where
298    W: Write,
299{
300    // RFC 9112:   request-line   = method SP request-target SP HTTP-version
301
302    output
303        .write_all(method.as_str().as_bytes())
304        .await
305        .map_err(Error::Io)?;
306    output.write_all(b" ").await.map_err(Error::Io)?;
307    output.write_all(path.as_bytes()).await.map_err(Error::Io)?;
308    output.write_all(b" ").await.map_err(Error::Io)?;
309    raw::send_version(&mut output, http11).await?;
310    output.write_all(b"\r\n").await.map_err(Error::Io)?;
311
312    Ok(())
313}
314
315pub(crate) async fn send_status<W>(
316    http11: bool,
317    status: u16,
318    reason: Option<&str>,
319    mut output: W,
320) -> Result<(), Error<W::Error>>
321where
322    W: Write,
323{
324    // RFC 9112:   status-line = HTTP-version SP status-code SP [ reason-phrase ]
325
326    raw::send_version(&mut output, http11).await?;
327    output.write_all(b" ").await.map_err(Error::Io)?;
328    let status_str: heapless::String<5> = unwrap!(status.try_into());
329    output
330        .write_all(status_str.as_bytes())
331        .await
332        .map_err(Error::Io)?;
333    output.write_all(b" ").await.map_err(Error::Io)?;
334    if let Some(reason) = reason {
335        output
336            .write_all(reason.as_bytes())
337            .await
338            .map_err(Error::Io)?;
339    }
340    output.write_all(b"\r\n").await.map_err(Error::Io)?;
341
342    Ok(())
343}
344
345pub(crate) async fn send_headers<'a, H, W>(
346    headers: H,
347    carry_over_connection_type: Option<ConnectionType>,
348    request: bool,
349    http11: bool,
350    chunked_if_unspecified: bool,
351    mut output: W,
352) -> Result<(ConnectionType, BodyType), Error<W::Error>>
353where
354    W: Write,
355    H: IntoIterator<Item = &'a (&'a str, &'a str)>,
356{
357    let (headers_connection_type, headers_body_type) = raw::send_headers(
358        headers
359            .into_iter()
360            .map(|(name, value)| (*name, value.as_bytes())),
361        &mut output,
362    )
363    .await?;
364
365    send_headers_end(
366        headers_connection_type,
367        headers_body_type,
368        carry_over_connection_type,
369        request,
370        http11,
371        chunked_if_unspecified,
372        output,
373    )
374    .await
375}
376
377async fn send_headers_end<W>(
378    headers_connection_type: Option<ConnectionType>,
379    headers_body_type: Option<BodyType>,
380    carry_over_connection_type: Option<ConnectionType>,
381    request: bool,
382    http11: bool,
383    chunked_if_unspecified: bool,
384    mut output: W,
385) -> Result<(ConnectionType, BodyType), Error<W::Error>>
386where
387    W: Write,
388{
389    let connection_type =
390        ConnectionType::resolve(headers_connection_type, carry_over_connection_type, http11)?;
391
392    let body_type = BodyType::resolve(
393        headers_body_type,
394        connection_type,
395        request,
396        http11,
397        chunked_if_unspecified,
398    )?;
399
400    if headers_connection_type.is_none() {
401        // Send an explicit Connection-Type just in case
402        let (name, value) = connection_type.raw_header();
403
404        raw::send_header(name, value, &mut output).await?;
405    }
406
407    if headers_body_type.is_none() {
408        let mut buf = heapless::String::new();
409
410        if let Some((name, value)) = body_type.raw_header(&mut buf) {
411            // Send explicit body type header just in case or if the body type was upgraded
412            raw::send_header(name, value, &mut output).await?;
413        }
414    }
415
416    raw::send_headers_end(output).await?;
417
418    Ok((connection_type, body_type))
419}
420
421impl<const N: usize> Headers<'_, N> {
422    fn resolve<E>(
423        &self,
424        carry_over_connection_type: Option<ConnectionType>,
425        request: bool,
426        http11: bool,
427    ) -> Result<(ConnectionType, BodyType), Error<E>> {
428        let headers_connection_type = ConnectionType::from_headers(self.iter());
429        let headers_body_type = BodyType::from_headers(self.iter());
430
431        let connection_type =
432            ConnectionType::resolve(headers_connection_type, carry_over_connection_type, http11)?;
433        let body_type =
434            BodyType::resolve(headers_body_type, connection_type, request, http11, false)?;
435
436        Ok((connection_type, body_type))
437    }
438
439    async fn send<W>(
440        &self,
441        carry_over_connection_type: Option<ConnectionType>,
442        request: bool,
443        http11: bool,
444        chunked_if_unspecified: bool,
445        mut output: W,
446    ) -> Result<(ConnectionType, BodyType), Error<W::Error>>
447    where
448        W: Write,
449    {
450        let (headers_connection_type, headers_body_type) =
451            raw::send_headers(self.iter_raw(), &mut output).await?;
452
453        send_headers_end(
454            headers_connection_type,
455            headers_body_type,
456            carry_over_connection_type,
457            request,
458            http11,
459            chunked_if_unspecified,
460            output,
461        )
462        .await
463    }
464}
465
466/// Represents an incoming HTTP request stream body
467///
468/// Implements the `Read` trait to read the body from the stream
469#[allow(private_interfaces)]
470pub enum Body<'b, R> {
471    /// The body is raw and should be read as is (only possible for HTTP responses with connection = Close)
472    Raw(PartiallyRead<'b, R>),
473    /// The body is of a known length (Content-Length)
474    ContentLen(ContentLenRead<PartiallyRead<'b, R>>),
475    /// The body is chunked (Transfer-Encoding: chunked)
476    Chunked(ChunkedRead<'b, PartiallyRead<'b, R>>),
477}
478
479impl<'b, R> Body<'b, R>
480where
481    R: Read,
482{
483    /// Create a new body
484    ///
485    /// Parameters:
486    /// - `body_type`: The type of the body, as resolved using `BodyType::resolve`
487    /// - `buf`: The buffer to use for reading the body
488    /// - `read_len`: The length of the buffer that has already been read when processing the icoming headers
489    /// - `input`: The raw input stream
490    pub fn new(body_type: BodyType, buf: &'b mut [u8], read_len: usize, input: R) -> Self {
491        match body_type {
492            BodyType::Chunked => Body::Chunked(ChunkedRead::new(
493                PartiallyRead::new(&[], input),
494                buf,
495                read_len,
496            )),
497            BodyType::ContentLen(content_len) => Body::ContentLen(ContentLenRead::new(
498                content_len,
499                PartiallyRead::new(&buf[..read_len], input),
500            )),
501            BodyType::Raw => Body::Raw(PartiallyRead::new(&buf[..read_len], input)),
502        }
503    }
504
505    /// Check if the body needs to be closed (i.e. the underlying input stream cannot be re-used for Keep-Alive connections)
506    pub fn needs_close(&self) -> bool {
507        !self.is_complete() || matches!(self, Self::Raw(_))
508    }
509
510    /// Check if the body has been completely read
511    pub fn is_complete(&self) -> bool {
512        match self {
513            Self::Raw(_) => true,
514            Self::ContentLen(r) => r.is_complete(),
515            Self::Chunked(r) => r.is_complete(),
516        }
517    }
518
519    /// Return a mutable reference to the underlying raw reader
520    pub fn as_raw_reader(&mut self) -> &mut R {
521        match self {
522            Self::Raw(r) => &mut r.input,
523            Self::ContentLen(r) => &mut r.input.input,
524            Self::Chunked(r) => &mut r.input.input,
525        }
526    }
527
528    /// Release the body, returning the underlying raw reader
529    pub fn release(self) -> R {
530        match self {
531            Self::Raw(r) => r.release(),
532            Self::ContentLen(r) => r.release().release(),
533            Self::Chunked(r) => r.release().release(),
534        }
535    }
536}
537
538impl<R> ErrorType for Body<'_, R>
539where
540    R: ErrorType,
541{
542    type Error = Error<R::Error>;
543}
544
545impl<R> Read for Body<'_, R>
546where
547    R: Read,
548{
549    async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
550        match self {
551            Self::Raw(read) => Ok(read.read(buf).await.map_err(Error::Io)?),
552            Self::ContentLen(read) => Ok(read.read(buf).await?),
553            Self::Chunked(read) => Ok(read.read(buf).await?),
554        }
555    }
556}
557
558pub(crate) struct PartiallyRead<'b, R> {
559    buf: &'b [u8],
560    read_len: usize,
561    input: R,
562}
563
564impl<'b, R> PartiallyRead<'b, R> {
565    pub const fn new(buf: &'b [u8], input: R) -> Self {
566        Self {
567            buf,
568            read_len: 0,
569            input,
570        }
571    }
572
573    // pub fn buf_len(&self) -> usize {
574    //     self.buf.len()
575    // }
576
577    // pub fn as_raw_reader(&mut self) -> &mut R {
578    //     &mut self.input
579    // }
580
581    pub fn release(self) -> R {
582        self.input
583    }
584}
585
586impl<R> ErrorType for PartiallyRead<'_, R>
587where
588    R: ErrorType,
589{
590    type Error = R::Error;
591}
592
593impl<R> Read for PartiallyRead<'_, R>
594where
595    R: Read,
596{
597    async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
598        if self.buf.len() > self.read_len {
599            let len = min(buf.len(), self.buf.len() - self.read_len);
600            buf[..len].copy_from_slice(&self.buf[self.read_len..self.read_len + len]);
601
602            self.read_len += len;
603
604            Ok(len)
605        } else {
606            Ok(self.input.read(buf).await?)
607        }
608    }
609}
610
611pub(crate) struct ContentLenRead<R> {
612    content_len: u64,
613    read_len: u64,
614    input: R,
615}
616
617impl<R> ContentLenRead<R> {
618    pub const fn new(content_len: u64, input: R) -> Self {
619        Self {
620            content_len,
621            read_len: 0,
622            input,
623        }
624    }
625
626    pub fn is_complete(&self) -> bool {
627        self.content_len == self.read_len
628    }
629
630    pub fn release(self) -> R {
631        self.input
632    }
633}
634
635impl<R> ErrorType for ContentLenRead<R>
636where
637    R: ErrorType,
638{
639    type Error = Error<R::Error>;
640}
641
642impl<R> Read for ContentLenRead<R>
643where
644    R: Read,
645{
646    async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
647        let len = min(buf.len() as _, self.content_len - self.read_len);
648        if len > 0 {
649            let read = self
650                .input
651                .read(&mut buf[..len as _])
652                .await
653                .map_err(Error::Io)?;
654            self.read_len += read as u64;
655
656            Ok(read)
657        } else {
658            Ok(0)
659        }
660    }
661}
662
663pub(crate) struct ChunkedRead<'b, R> {
664    buf: &'b mut [u8],
665    buf_offset: usize,
666    buf_len: usize,
667    input: R,
668    remain: u64,
669    complete: bool,
670}
671
672impl<'b, R> ChunkedRead<'b, R>
673where
674    R: Read,
675{
676    pub fn new(input: R, buf: &'b mut [u8], buf_len: usize) -> Self {
677        Self {
678            buf,
679            buf_offset: 0,
680            buf_len,
681            input,
682            remain: 0,
683            complete: false,
684        }
685    }
686
687    pub fn is_complete(&self) -> bool {
688        self.complete
689    }
690
691    pub fn release(self) -> R {
692        self.input
693    }
694
695    // The elegant pull parser taken from here:
696    // https://github.com/kchmck/uhttp_chunked_bytes.rs/blob/master/src/lib.rs
697    // Changes:
698    // - Converted to async
699    // - Iterators removed
700    // - Simpler error handling
701    // - Consumption of trailer
702    async fn next(&mut self) -> Result<Option<u8>, Error<R::Error>> {
703        if self.complete {
704            return Ok(None);
705        }
706
707        if self.remain == 0 {
708            if let Some(size) = self.parse_size().await? {
709                // If chunk size is zero (final chunk), the stream is finished [RFC7230§4.1].
710                if size == 0 {
711                    self.consume_trailer().await?;
712                    self.complete = true;
713                    return Ok(None);
714                }
715
716                self.remain = size;
717            } else {
718                self.complete = true;
719                return Ok(None);
720            }
721        }
722
723        let next = self.input_fetch().await?;
724        self.remain -= 1;
725
726        // If current chunk is finished, verify it ends with CRLF [RFC7230§4.1].
727        if self.remain == 0 {
728            self.consume_multi(b"\r\n").await?;
729        }
730
731        Ok(Some(next))
732    }
733
734    // Parse the number of bytes in the next chunk.
735    async fn parse_size(&mut self) -> Result<Option<u64>, Error<R::Error>> {
736        let mut digits = [0_u8; 16];
737
738        let slice = match self.parse_digits(&mut digits[..]).await? {
739            // This is safe because the following call to `from_str_radix` does
740            // its own verification on the bytes.
741            Some(s) => unsafe { str::from_utf8_unchecked(s) },
742            None => return Ok(None),
743        };
744
745        let size = u64::from_str_radix(slice, 16).map_err(|_| Error::InvalidBody)?;
746
747        Ok(Some(size))
748    }
749
750    // Extract the hex digits for the current chunk size.
751    async fn parse_digits<'a>(
752        &'a mut self,
753        digits: &'a mut [u8],
754    ) -> Result<Option<&'a [u8]>, Error<R::Error>> {
755        // Number of hex digits that have been extracted.
756        let mut len = 0;
757
758        loop {
759            let b = match self.input_next().await? {
760                Some(b) => b,
761                None => {
762                    return if len == 0 {
763                        // If EOF at the beginning of a new chunk, the stream is finished.
764                        Ok(None)
765                    } else {
766                        Err(Error::IncompleteBody)
767                    };
768                }
769            };
770
771            match b {
772                b'\r' => {
773                    self.consume(b'\n').await?;
774                    break;
775                }
776                b';' => {
777                    self.consume_ext().await?;
778                    break;
779                }
780                _ => {
781                    match digits.get_mut(len) {
782                        Some(d) => *d = b,
783                        None => return Err(Error::InvalidBody),
784                    }
785
786                    len += 1;
787                }
788            }
789        }
790
791        Ok(Some(&digits[..len]))
792    }
793
794    // Consume and discard current chunk extension.
795    // This doesn't check whether the characters up to CRLF actually have correct syntax.
796    async fn consume_ext(&mut self) -> Result<(), Error<R::Error>> {
797        self.consume_header().await?;
798
799        Ok(())
800    }
801
802    // Consume and discard the optional trailer following the last chunk.
803    async fn consume_trailer(&mut self) -> Result<(), Error<R::Error>> {
804        while self.consume_header().await? {}
805
806        Ok(())
807    }
808
809    // Consume and discard each header in the optional trailer following the last chunk.
810    async fn consume_header(&mut self) -> Result<bool, Error<R::Error>> {
811        let mut first = self.input_fetch().await?;
812        let mut len = 1;
813
814        loop {
815            let second = self.input_fetch().await?;
816            len += 1;
817
818            if first == b'\r' && second == b'\n' {
819                return Ok(len > 2);
820            }
821
822            first = second;
823        }
824    }
825
826    // Verify the next bytes in the stream match the expectation.
827    async fn consume_multi(&mut self, bytes: &[u8]) -> Result<(), Error<R::Error>> {
828        for byte in bytes {
829            self.consume(*byte).await?;
830        }
831
832        Ok(())
833    }
834
835    // Verify the next byte in the stream is matching the expectation.
836    async fn consume(&mut self, byte: u8) -> Result<(), Error<R::Error>> {
837        if self.input_fetch().await? == byte {
838            Ok(())
839        } else {
840            Err(Error::InvalidBody)
841        }
842    }
843
844    async fn input_fetch(&mut self) -> Result<u8, Error<R::Error>> {
845        self.input_next().await?.ok_or(Error::IncompleteBody)
846    }
847
848    async fn input_next(&mut self) -> Result<Option<u8>, Error<R::Error>> {
849        if self.buf_offset == self.buf_len {
850            self.buf_len = self.input.read(self.buf).await.map_err(Error::Io)?;
851            self.buf_offset = 0;
852        }
853
854        if self.buf_len > 0 {
855            let byte = self.buf[self.buf_offset];
856            self.buf_offset += 1;
857
858            Ok(Some(byte))
859        } else {
860            Ok(None)
861        }
862    }
863}
864
865impl<R> ErrorType for ChunkedRead<'_, R>
866where
867    R: ErrorType,
868{
869    type Error = Error<R::Error>;
870}
871
872impl<R> Read for ChunkedRead<'_, R>
873where
874    R: Read,
875{
876    async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
877        for (index, byte_pos) in buf.iter_mut().enumerate() {
878            if let Some(byte) = self.next().await? {
879                *byte_pos = byte;
880            } else {
881                return Ok(index);
882            }
883        }
884
885        Ok(buf.len())
886    }
887}
888
889/// Represents an outgoing HTTP request stream body
890///
891/// Implements the `Write` trait to write the body to the stream
892#[allow(private_interfaces)]
893pub enum SendBody<W> {
894    /// The body is raw and should be written as is (only possible for HTTP responses with connection = Close)
895    Raw(W),
896    /// The body is of a known length (Content-Length)
897    ContentLen(ContentLenWrite<W>),
898    /// The body is chunked (Transfer-Encoding: chunked)
899    Chunked(ChunkedWrite<W>),
900}
901
902impl<W> SendBody<W>
903where
904    W: Write,
905{
906    /// Create a new body
907    ///
908    /// Parameters:
909    /// - `body_type`: The type of the body, as resolved using `BodyType::resolve`
910    /// - `output`: The raw output stream
911    pub fn new(body_type: BodyType, output: W) -> SendBody<W> {
912        match body_type {
913            BodyType::Chunked => SendBody::Chunked(ChunkedWrite::new(output)),
914            BodyType::ContentLen(content_len) => {
915                SendBody::ContentLen(ContentLenWrite::new(content_len, output))
916            }
917            BodyType::Raw => SendBody::Raw(output),
918        }
919    }
920
921    /// Check if the body has been completely written to
922    pub fn is_complete(&self) -> bool {
923        match self {
924            Self::ContentLen(w) => w.is_complete(),
925            _ => true,
926        }
927    }
928
929    /// Check if the body needs to be closed (i.e. the underlying output stream cannot be re-used for Keep-Alive connections)
930    pub fn needs_close(&self) -> bool {
931        !self.is_complete() || matches!(self, Self::Raw(_))
932    }
933
934    /// Finish writing the body (necessary for chunked encoding)
935    pub async fn finish(&mut self) -> Result<(), Error<W::Error>>
936    where
937        W: Write,
938    {
939        match self {
940            Self::Raw(_) => (),
941            Self::ContentLen(w) => {
942                if !w.is_complete() {
943                    return Err(Error::IncompleteBody);
944                }
945            }
946            Self::Chunked(w) => w.finish().await?,
947        }
948
949        self.flush().await?;
950
951        Ok(())
952    }
953
954    /// Return a mutable reference to the underlying raw writer
955    pub fn as_raw_writer(&mut self) -> &mut W {
956        match self {
957            Self::Raw(w) => w,
958            Self::ContentLen(w) => &mut w.output,
959            Self::Chunked(w) => &mut w.output,
960        }
961    }
962
963    /// Release the body, returning the underlying raw writer
964    pub fn release(self) -> W {
965        match self {
966            Self::Raw(w) => w,
967            Self::ContentLen(w) => w.release(),
968            Self::Chunked(w) => w.release(),
969        }
970    }
971}
972
973impl<W> ErrorType for SendBody<W>
974where
975    W: ErrorType,
976{
977    type Error = Error<W::Error>;
978}
979
980impl<W> Write for SendBody<W>
981where
982    W: Write,
983{
984    async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
985        match self {
986            Self::Raw(w) => Ok(w.write(buf).await.map_err(Error::Io)?),
987            Self::ContentLen(w) => Ok(w.write(buf).await?),
988            Self::Chunked(w) => Ok(w.write(buf).await?),
989        }
990    }
991
992    async fn flush(&mut self) -> Result<(), Self::Error> {
993        match self {
994            Self::Raw(w) => Ok(w.flush().await.map_err(Error::Io)?),
995            Self::ContentLen(w) => Ok(w.flush().await?),
996            Self::Chunked(w) => Ok(w.flush().await?),
997        }
998    }
999}
1000
1001pub(crate) struct ContentLenWrite<W> {
1002    content_len: u64,
1003    write_len: u64,
1004    output: W,
1005}
1006
1007impl<W> ContentLenWrite<W> {
1008    pub const fn new(content_len: u64, output: W) -> Self {
1009        Self {
1010            content_len,
1011            write_len: 0,
1012            output,
1013        }
1014    }
1015
1016    pub fn is_complete(&self) -> bool {
1017        self.content_len == self.write_len
1018    }
1019
1020    pub fn release(self) -> W {
1021        self.output
1022    }
1023}
1024
1025impl<W> ErrorType for ContentLenWrite<W>
1026where
1027    W: ErrorType,
1028{
1029    type Error = Error<W::Error>;
1030}
1031
1032impl<W> Write for ContentLenWrite<W>
1033where
1034    W: Write,
1035{
1036    async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
1037        if self.content_len >= self.write_len + buf.len() as u64 {
1038            let write = self.output.write(buf).await.map_err(Error::Io)?;
1039            self.write_len += write as u64;
1040
1041            Ok(write)
1042        } else {
1043            Err(Error::TooLongBody)
1044        }
1045    }
1046
1047    async fn flush(&mut self) -> Result<(), Self::Error> {
1048        self.output.flush().await.map_err(Error::Io)
1049    }
1050}
1051
1052pub(crate) struct ChunkedWrite<W> {
1053    output: W,
1054    finished: bool,
1055}
1056
1057impl<W> ChunkedWrite<W> {
1058    pub const fn new(output: W) -> Self {
1059        Self {
1060            output,
1061            finished: false,
1062        }
1063    }
1064
1065    pub async fn finish(&mut self) -> Result<(), Error<W::Error>>
1066    where
1067        W: Write,
1068    {
1069        if !self.finished {
1070            self.output
1071                .write_all(b"0\r\n\r\n")
1072                .await
1073                .map_err(Error::Io)?;
1074            self.finished = true;
1075        }
1076
1077        Ok(())
1078    }
1079
1080    pub fn release(self) -> W {
1081        self.output
1082    }
1083}
1084
1085impl<W> ErrorType for ChunkedWrite<W>
1086where
1087    W: ErrorType,
1088{
1089    type Error = Error<W::Error>;
1090}
1091
1092impl<W> Write for ChunkedWrite<W>
1093where
1094    W: Write,
1095{
1096    async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
1097        if self.finished {
1098            Err(Error::InvalidState)
1099        } else if !buf.is_empty() {
1100            let mut len_str = heapless::String::<8>::new();
1101            write_unwrap!(&mut len_str, "{:x}", buf.len());
1102
1103            self.output
1104                .write_all(len_str.as_bytes())
1105                .await
1106                .map_err(Error::Io)?;
1107
1108            self.output.write_all(b"\r\n").await.map_err(Error::Io)?;
1109            self.output.write_all(buf).await.map_err(Error::Io)?;
1110            self.output.write_all(b"\r\n").await.map_err(Error::Io)?;
1111
1112            Ok(buf.len())
1113        } else {
1114            Ok(0)
1115        }
1116    }
1117
1118    async fn flush(&mut self) -> Result<(), Self::Error> {
1119        self.output.flush().await.map_err(Error::Io)
1120    }
1121}
1122
1123mod raw {
1124    use core::str;
1125
1126    use embedded_io_async::{Read, Write};
1127
1128    use crate::{BodyType, ConnectionType};
1129
1130    use super::Error;
1131
1132    pub(crate) async fn read_reply_buf<const N: usize, R>(
1133        mut input: R,
1134        buf: &mut [u8],
1135        request: bool,
1136        exact: bool,
1137    ) -> Result<(usize, usize), Error<R::Error>>
1138    where
1139        R: Read,
1140    {
1141        if exact {
1142            let raw_headers_len = read_headers(&mut input, buf).await?;
1143
1144            let mut headers = [httparse::EMPTY_HEADER; N];
1145
1146            let status = if request {
1147                httparse::Request::new(&mut headers).parse(&buf[..raw_headers_len])?
1148            } else {
1149                httparse::Response::new(&mut headers).parse(&buf[..raw_headers_len])?
1150            };
1151
1152            if let httparse::Status::Complete(headers_len) = status {
1153                return Ok((raw_headers_len, headers_len));
1154            }
1155
1156            Err(Error::TooManyHeaders)
1157        } else {
1158            let mut offset = 0;
1159            let mut size = 0;
1160
1161            while buf.len() > size {
1162                let read = input.read(&mut buf[offset..]).await.map_err(Error::Io)?;
1163                if read == 0 {
1164                    Err(if offset == 0 {
1165                        Error::ConnectionClosed
1166                    } else {
1167                        Error::IncompleteHeaders
1168                    })?;
1169                }
1170
1171                offset += read;
1172                size += read;
1173
1174                let mut headers = [httparse::EMPTY_HEADER; N];
1175
1176                let status = if request {
1177                    httparse::Request::new(&mut headers).parse(&buf[..size])?
1178                } else {
1179                    httparse::Response::new(&mut headers).parse(&buf[..size])?
1180                };
1181
1182                if let httparse::Status::Complete(headers_len) = status {
1183                    return Ok((size, headers_len));
1184                }
1185            }
1186
1187            Err(Error::TooManyHeaders)
1188        }
1189    }
1190
1191    pub(crate) async fn read_headers<R>(
1192        mut input: R,
1193        buf: &mut [u8],
1194    ) -> Result<usize, Error<R::Error>>
1195    where
1196        R: Read,
1197    {
1198        let mut offset = 0;
1199        let mut byte = [0];
1200
1201        loop {
1202            if offset == buf.len() {
1203                Err(Error::TooLongHeaders)?;
1204            }
1205
1206            let read = input.read(&mut byte).await.map_err(Error::Io)?;
1207
1208            if read == 0 {
1209                Err(if offset == 0 {
1210                    Error::ConnectionClosed
1211                } else {
1212                    Error::IncompleteHeaders
1213                })?;
1214            }
1215
1216            buf[offset] = byte[0];
1217
1218            offset += 1;
1219
1220            if offset >= b"\r\n\r\n".len() && buf[offset - 4..offset] == *b"\r\n\r\n" {
1221                break Ok(offset);
1222            }
1223        }
1224    }
1225
1226    pub(crate) async fn send_version<W>(mut output: W, http11: bool) -> Result<(), Error<W::Error>>
1227    where
1228        W: Write,
1229    {
1230        output
1231            .write_all(if http11 { b"HTTP/1.1" } else { b"HTTP/1.0" })
1232            .await
1233            .map_err(Error::Io)
1234    }
1235
1236    pub(crate) async fn send_headers<'a, H, W>(
1237        headers: H,
1238        mut output: W,
1239    ) -> Result<(Option<ConnectionType>, Option<BodyType>), Error<W::Error>>
1240    where
1241        W: Write,
1242        H: IntoIterator<Item = (&'a str, &'a [u8])>,
1243    {
1244        let mut connection = None;
1245        let mut body = None;
1246
1247        for (name, value) in headers.into_iter() {
1248            let header_connection =
1249                ConnectionType::from_header(name, unsafe { str::from_utf8_unchecked(value) });
1250
1251            if let Some(header_connection) = header_connection {
1252                if let Some(connection) = connection {
1253                    warn!(
1254                        "Multiple Connection headers found. Current {} and new {}",
1255                        connection, header_connection
1256                    );
1257                }
1258
1259                // The last connection header wins
1260                connection = Some(header_connection);
1261            }
1262
1263            let header_body =
1264                BodyType::from_header(name, unsafe { str::from_utf8_unchecked(value) });
1265
1266            if let Some(header_body) = header_body {
1267                if let Some(body) = body {
1268                    warn!(
1269                        "Multiple body type headers found. Current {} and new {}",
1270                        body, header_body
1271                    );
1272                }
1273
1274                // The last body header wins
1275                body = Some(header_body);
1276            }
1277
1278            send_header(name, value, &mut output).await?;
1279        }
1280
1281        Ok((connection, body))
1282    }
1283
1284    pub(crate) async fn send_header<W>(
1285        name: &str,
1286        value: &[u8],
1287        mut output: W,
1288    ) -> Result<(), Error<W::Error>>
1289    where
1290        W: Write,
1291    {
1292        output.write_all(name.as_bytes()).await.map_err(Error::Io)?;
1293        output.write_all(b": ").await.map_err(Error::Io)?;
1294        output.write_all(value).await.map_err(Error::Io)?;
1295        output.write_all(b"\r\n").await.map_err(Error::Io)?;
1296
1297        Ok(())
1298    }
1299
1300    pub(crate) async fn send_headers_end<W>(mut output: W) -> Result<(), Error<W::Error>>
1301    where
1302        W: Write,
1303    {
1304        output.write_all(b"\r\n").await.map_err(Error::Io)
1305    }
1306}
1307
1308#[cfg(test)]
1309mod test {
1310    use embedded_io_async::{ErrorType, Read};
1311
1312    use super::*;
1313
1314    struct SliceRead<'a>(&'a [u8]);
1315
1316    impl<'a> ErrorType for SliceRead<'a> {
1317        type Error = core::convert::Infallible;
1318    }
1319
1320    impl<'a> Read for SliceRead<'a> {
1321        async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
1322            let len = core::cmp::min(buf.len(), self.0.len());
1323            buf[..len].copy_from_slice(&self.0[..len]);
1324
1325            self.0 = &self.0[len..];
1326
1327            Ok(len)
1328        }
1329    }
1330
1331    #[test]
1332    fn test_chunked_bytes() {
1333        // Normal
1334        expect(b"A\r\nabcdefghij\r\n2\r\n42\r\n", Some(b"abcdefghij42"));
1335        expect(b"a\r\nabc\r\nfghij\r\n2\r\n42\r\n", Some(b"abc\r\nfghij42"));
1336
1337        // Trailing headers
1338        expect(b"4\r\nabcd\r\n0\r\n\r\n", Some(b"abcd"));
1339        expect(b"4\r\nabcd\r\n0\r\nA: B\r\n\r\n", Some(b"abcd"));
1340
1341        // Empty
1342        expect(b"", Some(b""));
1343        expect(b"0\r\n\r\n", Some(b""));
1344
1345        // Erroneous
1346        expect(b"h\r\n", None);
1347        expect(b"\r\na", None);
1348        expect(b"4\r\nabcdefg", None);
1349    }
1350
1351    fn expect(input: &[u8], expected: Option<&[u8]>) {
1352        embassy_futures::block_on(async move {
1353            let mut buf1 = [0; 64];
1354            let mut buf2 = [0; 64];
1355
1356            let stream = SliceRead(input);
1357            let mut r = ChunkedRead::new(stream, &mut buf1, 0);
1358
1359            if let Some(expected) = expected {
1360                assert!(r.read_exact(&mut buf2[..expected.len()]).await.is_ok());
1361
1362                assert_eq!(&buf2[..expected.len()], expected);
1363
1364                let len = r.read(&mut buf2).await;
1365                assert!(len.is_ok());
1366
1367                assert_eq!(unwrap!(len), 0);
1368            } else {
1369                assert!(r.read(&mut buf2).await.is_err());
1370            }
1371        })
1372    }
1373}