edge_http/
io.rs

1use core::cmp::min;
2use core::fmt::{Display, Write as _};
3use core::str;
4
5use embedded_io_async::{ErrorType, Read, Write};
6
7use httparse::Status;
8
9use crate::ws::UpgradeError;
10use crate::{
11    BodyType, ConnectionType, Headers, HeadersMismatchError, Method, RequestHeaders,
12    ResponseHeaders,
13};
14
15pub mod client;
16pub mod server;
17
18/// An error in parsing the headers or the body.
19#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
20pub enum Error<E> {
21    InvalidHeaders,
22    InvalidBody,
23    TooManyHeaders,
24    TooLongHeaders,
25    TooLongBody,
26    IncompleteHeaders,
27    IncompleteBody,
28    InvalidState,
29    ConnectionClosed,
30    HeadersMismatchError(HeadersMismatchError),
31    WsUpgradeError(UpgradeError),
32    Io(E),
33}
34
35pub type ErrorKind = Error<edge_nal::io::ErrorKind>;
36
37impl<E> Error<E>
38where
39    E: edge_nal::io::Error,
40{
41    pub fn erase(&self) -> Error<edge_nal::io::ErrorKind> {
42        match self {
43            Self::InvalidHeaders => Error::InvalidHeaders,
44            Self::InvalidBody => Error::InvalidBody,
45            Self::TooManyHeaders => Error::TooManyHeaders,
46            Self::TooLongHeaders => Error::TooLongHeaders,
47            Self::TooLongBody => Error::TooLongBody,
48            Self::IncompleteHeaders => Error::IncompleteHeaders,
49            Self::IncompleteBody => Error::IncompleteBody,
50            Self::InvalidState => Error::InvalidState,
51            Self::ConnectionClosed => Error::ConnectionClosed,
52            Self::HeadersMismatchError(e) => Error::HeadersMismatchError(*e),
53            Self::WsUpgradeError(e) => Error::WsUpgradeError(*e),
54            Self::Io(e) => Error::Io(e.kind()),
55        }
56    }
57}
58
59impl<E> From<httparse::Error> for Error<E> {
60    fn from(e: httparse::Error) -> Self {
61        match e {
62            httparse::Error::HeaderName => Self::InvalidHeaders,
63            httparse::Error::HeaderValue => Self::InvalidHeaders,
64            httparse::Error::NewLine => Self::InvalidHeaders,
65            httparse::Error::Status => Self::InvalidHeaders,
66            httparse::Error::Token => Self::InvalidHeaders,
67            httparse::Error::TooManyHeaders => Self::TooManyHeaders,
68            httparse::Error::Version => Self::InvalidHeaders,
69        }
70    }
71}
72
73impl<E> From<HeadersMismatchError> for Error<E> {
74    fn from(e: HeadersMismatchError) -> Self {
75        Self::HeadersMismatchError(e)
76    }
77}
78
79impl<E> From<UpgradeError> for Error<E> {
80    fn from(e: UpgradeError) -> Self {
81        Self::WsUpgradeError(e)
82    }
83}
84
85impl<E> embedded_io_async::Error for Error<E>
86where
87    E: embedded_io_async::Error,
88{
89    fn kind(&self) -> embedded_io_async::ErrorKind {
90        match self {
91            Self::Io(e) => e.kind(),
92            _ => embedded_io_async::ErrorKind::Other,
93        }
94    }
95}
96
97impl<E> Display for Error<E>
98where
99    E: Display,
100{
101    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
102        match self {
103            Self::InvalidHeaders => write!(f, "Invalid HTTP headers or status line"),
104            Self::InvalidBody => write!(f, "Invalid HTTP body"),
105            Self::TooManyHeaders => write!(f, "Too many HTTP headers"),
106            Self::TooLongHeaders => write!(f, "HTTP headers section is too long"),
107            Self::TooLongBody => write!(f, "HTTP body is too long"),
108            Self::IncompleteHeaders => write!(f, "HTTP headers section is incomplete"),
109            Self::IncompleteBody => write!(f, "HTTP body is incomplete"),
110            Self::InvalidState => write!(f, "Connection is not in requested state"),
111            Self::HeadersMismatchError(e) => write!(f, "Headers mismatch: {e}"),
112            Self::WsUpgradeError(e) => write!(f, "WebSocket upgrade error: {e}"),
113            Self::ConnectionClosed => write!(f, "Connection closed"),
114            Self::Io(e) => write!(f, "{e}"),
115        }
116    }
117}
118
119#[cfg(feature = "defmt")]
120impl<E> defmt::Format for Error<E>
121where
122    E: defmt::Format,
123{
124    fn format(&self, f: defmt::Formatter<'_>) {
125        match self {
126            Self::InvalidHeaders => defmt::write!(f, "Invalid HTTP headers or status line"),
127            Self::InvalidBody => defmt::write!(f, "Invalid HTTP body"),
128            Self::TooManyHeaders => defmt::write!(f, "Too many HTTP headers"),
129            Self::TooLongHeaders => defmt::write!(f, "HTTP headers section is too long"),
130            Self::TooLongBody => defmt::write!(f, "HTTP body is too long"),
131            Self::IncompleteHeaders => defmt::write!(f, "HTTP headers section is incomplete"),
132            Self::IncompleteBody => defmt::write!(f, "HTTP body is incomplete"),
133            Self::InvalidState => defmt::write!(f, "Connection is not in requested state"),
134            Self::HeadersMismatchError(e) => defmt::write!(f, "Headers mismatch: {}", e),
135            Self::WsUpgradeError(e) => defmt::write!(f, "WebSocket upgrade error: {}", e),
136            Self::ConnectionClosed => defmt::write!(f, "Connection closed"),
137            Self::Io(e) => defmt::write!(f, "{}", e),
138        }
139    }
140}
141
142impl<E> core::error::Error for Error<E> where E: core::error::Error {}
143
144impl<'b, const N: usize> RequestHeaders<'b, N> {
145    /// Parse the headers from the input stream
146    pub async fn receive<R>(
147        &mut self,
148        buf: &'b mut [u8],
149        mut input: R,
150        exact: bool,
151    ) -> Result<(&'b mut [u8], usize), Error<R::Error>>
152    where
153        R: Read,
154    {
155        let (read_len, headers_len) =
156            match raw::read_reply_buf::<N, _>(&mut input, buf, true, exact).await {
157                Ok(read_len) => read_len,
158                Err(e) => return Err(e),
159            };
160
161        let mut parser = httparse::Request::new(&mut self.headers.0);
162
163        let (headers_buf, body_buf) = buf.split_at_mut(headers_len);
164
165        let status = match parser.parse(headers_buf) {
166            Ok(status) => status,
167            Err(e) => return Err(e.into()),
168        };
169
170        if let Status::Complete(headers_len2) = status {
171            if headers_len != headers_len2 {
172                unreachable!("Should not happen. HTTP header parsing is indeterminate.")
173            }
174
175            self.http11 = match parser.version {
176                Some(0) => false,
177                Some(1) => true,
178                _ => Err(Error::InvalidHeaders)?,
179            };
180
181            let method_str = parser.method.ok_or(Error::InvalidHeaders)?;
182            self.method = Method::new(method_str).ok_or(Error::InvalidHeaders)?;
183            self.path = parser.path.ok_or(Error::InvalidHeaders)?;
184
185            trace!("Received:\n{}", self);
186
187            Ok((body_buf, read_len - headers_len))
188        } else {
189            unreachable!("Secondary parse of already loaded buffer failed.")
190        }
191    }
192
193    /// Resolve the connection type and body type from the headers
194    pub fn resolve<E>(&self) -> Result<(ConnectionType, BodyType), Error<E>> {
195        self.headers.resolve::<E>(None, true, self.http11)
196    }
197
198    /// Send the headers to the output stream, returning the connection type and body type
199    pub async fn send<W>(
200        &self,
201        chunked_if_unspecified: bool,
202        mut output: W,
203    ) -> Result<(ConnectionType, BodyType), Error<W::Error>>
204    where
205        W: Write,
206    {
207        send_request(self.http11, self.method, self.path, &mut output).await?;
208
209        self.headers
210            .send(None, true, self.http11, chunked_if_unspecified, output)
211            .await
212    }
213}
214
215impl<'b, const N: usize> ResponseHeaders<'b, N> {
216    /// Parse the headers from the input stream
217    pub async fn receive<R>(
218        &mut self,
219        buf: &'b mut [u8],
220        mut input: R,
221        exact: bool,
222    ) -> Result<(&'b mut [u8], usize), Error<R::Error>>
223    where
224        R: Read,
225    {
226        let (read_len, headers_len) =
227            raw::read_reply_buf::<N, _>(&mut input, buf, false, exact).await?;
228
229        let mut parser = httparse::Response::new(&mut self.headers.0);
230
231        let (headers_buf, body_buf) = buf.split_at_mut(headers_len);
232
233        let status = parser.parse(headers_buf).map_err(Error::from)?;
234
235        if let Status::Complete(headers_len2) = status {
236            if headers_len != headers_len2 {
237                unreachable!("Should not happen. HTTP header parsing is indeterminate.")
238            }
239
240            self.http11 = match parser.version {
241                Some(0) => false,
242                Some(1) => true,
243                _ => Err(Error::InvalidHeaders)?,
244            };
245
246            self.code = parser.code.ok_or(Error::InvalidHeaders)?;
247            self.reason = parser.reason;
248
249            trace!("Received:\n{}", self);
250
251            Ok((body_buf, read_len - headers_len))
252        } else {
253            unreachable!("Secondary parse of already loaded buffer failed.")
254        }
255    }
256
257    /// Resolve the connection type and body type from the headers
258    pub fn resolve<E>(
259        &self,
260        request_connection_type: ConnectionType,
261    ) -> Result<(ConnectionType, BodyType), Error<E>> {
262        self.headers
263            .resolve::<E>(Some(request_connection_type), false, self.http11)
264    }
265
266    /// Send the headers to the output stream, returning the connection type and body type
267    pub async fn send<W>(
268        &self,
269        request_connection_type: ConnectionType,
270        chunked_if_unspecified: bool,
271        mut output: W,
272    ) -> Result<(ConnectionType, BodyType), Error<W::Error>>
273    where
274        W: Write,
275    {
276        send_status(self.http11, self.code, self.reason, &mut output).await?;
277
278        self.headers
279            .send(
280                Some(request_connection_type),
281                false,
282                self.http11,
283                chunked_if_unspecified,
284                output,
285            )
286            .await
287    }
288}
289
290pub(crate) async fn send_request<W>(
291    http11: bool,
292    method: Method,
293    path: &str,
294    mut output: W,
295) -> Result<(), Error<W::Error>>
296where
297    W: Write,
298{
299    // RFC 9112:   request-line   = method SP request-target SP HTTP-version
300
301    output
302        .write_all(method.as_str().as_bytes())
303        .await
304        .map_err(Error::Io)?;
305    output.write_all(b" ").await.map_err(Error::Io)?;
306    output.write_all(path.as_bytes()).await.map_err(Error::Io)?;
307    output.write_all(b" ").await.map_err(Error::Io)?;
308    raw::send_version(&mut output, http11).await?;
309    output.write_all(b"\r\n").await.map_err(Error::Io)?;
310
311    Ok(())
312}
313
314pub(crate) async fn send_status<W>(
315    http11: bool,
316    status: u16,
317    reason: Option<&str>,
318    mut output: W,
319) -> Result<(), Error<W::Error>>
320where
321    W: Write,
322{
323    // RFC 9112:   status-line = HTTP-version SP status-code SP [ reason-phrase ]
324
325    raw::send_version(&mut output, http11).await?;
326    output.write_all(b" ").await.map_err(Error::Io)?;
327    let status_str: heapless::String<5> = unwrap!(status.try_into());
328    output
329        .write_all(status_str.as_bytes())
330        .await
331        .map_err(Error::Io)?;
332    output.write_all(b" ").await.map_err(Error::Io)?;
333    if let Some(reason) = reason {
334        output
335            .write_all(reason.as_bytes())
336            .await
337            .map_err(Error::Io)?;
338    }
339    output.write_all(b"\r\n").await.map_err(Error::Io)?;
340
341    Ok(())
342}
343
344pub(crate) async fn send_headers<'a, H, W>(
345    headers: H,
346    carry_over_connection_type: Option<ConnectionType>,
347    request: bool,
348    http11: bool,
349    chunked_if_unspecified: bool,
350    mut output: W,
351) -> Result<(ConnectionType, BodyType), Error<W::Error>>
352where
353    W: Write,
354    H: IntoIterator<Item = &'a (&'a str, &'a str)>,
355{
356    let (headers_connection_type, headers_body_type) = raw::send_headers(
357        headers
358            .into_iter()
359            .map(|(name, value)| (*name, value.as_bytes())),
360        &mut output,
361    )
362    .await?;
363
364    send_headers_end(
365        headers_connection_type,
366        headers_body_type,
367        carry_over_connection_type,
368        request,
369        http11,
370        chunked_if_unspecified,
371        output,
372    )
373    .await
374}
375
376async fn send_headers_end<W>(
377    headers_connection_type: Option<ConnectionType>,
378    headers_body_type: Option<BodyType>,
379    carry_over_connection_type: Option<ConnectionType>,
380    request: bool,
381    http11: bool,
382    chunked_if_unspecified: bool,
383    mut output: W,
384) -> Result<(ConnectionType, BodyType), Error<W::Error>>
385where
386    W: Write,
387{
388    let connection_type =
389        ConnectionType::resolve(headers_connection_type, carry_over_connection_type, http11)?;
390
391    let body_type = BodyType::resolve(
392        headers_body_type,
393        connection_type,
394        request,
395        http11,
396        chunked_if_unspecified,
397    )?;
398
399    if headers_connection_type.is_none() {
400        // Send an explicit Connection-Type just in case
401        let (name, value) = connection_type.raw_header();
402
403        raw::send_header(name, value, &mut output).await?;
404    }
405
406    if headers_body_type.is_none() {
407        let mut buf = heapless::String::new();
408
409        if let Some((name, value)) = body_type.raw_header(&mut buf) {
410            // Send explicit body type header just in case or if the body type was upgraded
411            raw::send_header(name, value, &mut output).await?;
412        }
413    }
414
415    raw::send_headers_end(output).await?;
416
417    Ok((connection_type, body_type))
418}
419
420impl<const N: usize> Headers<'_, N> {
421    fn resolve<E>(
422        &self,
423        carry_over_connection_type: Option<ConnectionType>,
424        request: bool,
425        http11: bool,
426    ) -> Result<(ConnectionType, BodyType), Error<E>> {
427        let headers_connection_type = ConnectionType::from_headers(self.iter());
428        let headers_body_type = BodyType::from_headers(self.iter());
429
430        let connection_type =
431            ConnectionType::resolve(headers_connection_type, carry_over_connection_type, http11)?;
432        let body_type =
433            BodyType::resolve(headers_body_type, connection_type, request, http11, false)?;
434
435        Ok((connection_type, body_type))
436    }
437
438    async fn send<W>(
439        &self,
440        carry_over_connection_type: Option<ConnectionType>,
441        request: bool,
442        http11: bool,
443        chunked_if_unspecified: bool,
444        mut output: W,
445    ) -> Result<(ConnectionType, BodyType), Error<W::Error>>
446    where
447        W: Write,
448    {
449        let (headers_connection_type, headers_body_type) =
450            raw::send_headers(self.iter_raw(), &mut output).await?;
451
452        send_headers_end(
453            headers_connection_type,
454            headers_body_type,
455            carry_over_connection_type,
456            request,
457            http11,
458            chunked_if_unspecified,
459            output,
460        )
461        .await
462    }
463}
464
465/// Represents an incoming HTTP request stream body
466///
467/// Implements the `Read` trait to read the body from the stream
468#[allow(private_interfaces)]
469pub enum Body<'b, R> {
470    /// The body is raw and should be read as is (only possible for HTTP responses with connection = Close)
471    Raw(PartiallyRead<'b, R>),
472    /// The body is of a known length (Content-Length)
473    ContentLen(ContentLenRead<PartiallyRead<'b, R>>),
474    /// The body is chunked (Transfer-Encoding: chunked)
475    Chunked(ChunkedRead<'b, PartiallyRead<'b, R>>),
476}
477
478impl<'b, R> Body<'b, R>
479where
480    R: Read,
481{
482    /// Create a new body
483    ///
484    /// Parameters:
485    /// - `body_type`: The type of the body, as resolved using `BodyType::resolve`
486    /// - `buf`: The buffer to use for reading the body
487    /// - `read_len`: The length of the buffer that has already been read when processing the icoming headers
488    /// - `input`: The raw input stream
489    pub fn new(body_type: BodyType, buf: &'b mut [u8], read_len: usize, input: R) -> Self {
490        match body_type {
491            BodyType::Chunked => Body::Chunked(ChunkedRead::new(
492                PartiallyRead::new(&[], input),
493                buf,
494                read_len,
495            )),
496            BodyType::ContentLen(content_len) => Body::ContentLen(ContentLenRead::new(
497                content_len,
498                PartiallyRead::new(&buf[..read_len], input),
499            )),
500            BodyType::Raw => Body::Raw(PartiallyRead::new(&buf[..read_len], input)),
501        }
502    }
503
504    /// Check if the body needs to be closed (i.e. the underlying input stream cannot be re-used for Keep-Alive connections)
505    pub fn needs_close(&self) -> bool {
506        !self.is_complete() || matches!(self, Self::Raw(_))
507    }
508
509    /// Check if the body has been completely read
510    pub fn is_complete(&self) -> bool {
511        match self {
512            Self::Raw(_) => true,
513            Self::ContentLen(r) => r.is_complete(),
514            Self::Chunked(r) => r.is_complete(),
515        }
516    }
517
518    /// Return a mutable reference to the underlying raw reader
519    pub fn as_raw_reader(&mut self) -> &mut R {
520        match self {
521            Self::Raw(r) => &mut r.input,
522            Self::ContentLen(r) => &mut r.input.input,
523            Self::Chunked(r) => &mut r.input.input,
524        }
525    }
526
527    /// Release the body, returning the underlying raw reader
528    pub fn release(self) -> R {
529        match self {
530            Self::Raw(r) => r.release(),
531            Self::ContentLen(r) => r.release().release(),
532            Self::Chunked(r) => r.release().release(),
533        }
534    }
535}
536
537impl<R> ErrorType for Body<'_, R>
538where
539    R: ErrorType,
540{
541    type Error = Error<R::Error>;
542}
543
544impl<R> Read for Body<'_, R>
545where
546    R: Read,
547{
548    async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
549        match self {
550            Self::Raw(read) => Ok(read.read(buf).await.map_err(Error::Io)?),
551            Self::ContentLen(read) => Ok(read.read(buf).await?),
552            Self::Chunked(read) => Ok(read.read(buf).await?),
553        }
554    }
555}
556
557pub(crate) struct PartiallyRead<'b, R> {
558    buf: &'b [u8],
559    read_len: usize,
560    input: R,
561}
562
563impl<'b, R> PartiallyRead<'b, R> {
564    pub const fn new(buf: &'b [u8], input: R) -> Self {
565        Self {
566            buf,
567            read_len: 0,
568            input,
569        }
570    }
571
572    // pub fn buf_len(&self) -> usize {
573    //     self.buf.len()
574    // }
575
576    // pub fn as_raw_reader(&mut self) -> &mut R {
577    //     &mut self.input
578    // }
579
580    pub fn release(self) -> R {
581        self.input
582    }
583}
584
585impl<R> ErrorType for PartiallyRead<'_, R>
586where
587    R: ErrorType,
588{
589    type Error = R::Error;
590}
591
592impl<R> Read for PartiallyRead<'_, R>
593where
594    R: Read,
595{
596    async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
597        if self.buf.len() > self.read_len {
598            let len = min(buf.len(), self.buf.len() - self.read_len);
599            buf[..len].copy_from_slice(&self.buf[self.read_len..self.read_len + len]);
600
601            self.read_len += len;
602
603            Ok(len)
604        } else {
605            Ok(self.input.read(buf).await?)
606        }
607    }
608}
609
610pub(crate) struct ContentLenRead<R> {
611    content_len: u64,
612    read_len: u64,
613    input: R,
614}
615
616impl<R> ContentLenRead<R> {
617    pub const fn new(content_len: u64, input: R) -> Self {
618        Self {
619            content_len,
620            read_len: 0,
621            input,
622        }
623    }
624
625    pub fn is_complete(&self) -> bool {
626        self.content_len == self.read_len
627    }
628
629    pub fn release(self) -> R {
630        self.input
631    }
632}
633
634impl<R> ErrorType for ContentLenRead<R>
635where
636    R: ErrorType,
637{
638    type Error = Error<R::Error>;
639}
640
641impl<R> Read for ContentLenRead<R>
642where
643    R: Read,
644{
645    async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
646        let len = min(buf.len() as _, self.content_len - self.read_len);
647        if len > 0 {
648            let read = self
649                .input
650                .read(&mut buf[..len as _])
651                .await
652                .map_err(Error::Io)?;
653            self.read_len += read as u64;
654
655            Ok(read)
656        } else {
657            Ok(0)
658        }
659    }
660}
661
662pub(crate) struct ChunkedRead<'b, R> {
663    buf: &'b mut [u8],
664    buf_offset: usize,
665    buf_len: usize,
666    input: R,
667    remain: u64,
668    complete: bool,
669}
670
671impl<'b, R> ChunkedRead<'b, R>
672where
673    R: Read,
674{
675    pub fn new(input: R, buf: &'b mut [u8], buf_len: usize) -> Self {
676        Self {
677            buf,
678            buf_offset: 0,
679            buf_len,
680            input,
681            remain: 0,
682            complete: false,
683        }
684    }
685
686    pub fn is_complete(&self) -> bool {
687        self.complete
688    }
689
690    pub fn release(self) -> R {
691        self.input
692    }
693
694    // The elegant pull parser taken from here:
695    // https://github.com/kchmck/uhttp_chunked_bytes.rs/blob/master/src/lib.rs
696    // Changes:
697    // - Converted to async
698    // - Iterators removed
699    // - Simpler error handling
700    // - Consumption of trailer
701    async fn next(&mut self) -> Result<Option<u8>, Error<R::Error>> {
702        if self.complete {
703            return Ok(None);
704        }
705
706        if self.remain == 0 {
707            if let Some(size) = self.parse_size().await? {
708                // If chunk size is zero (final chunk), the stream is finished [RFC7230§4.1].
709                if size == 0 {
710                    self.consume_trailer().await?;
711                    self.complete = true;
712                    return Ok(None);
713                }
714
715                self.remain = size;
716            } else {
717                self.complete = true;
718                return Ok(None);
719            }
720        }
721
722        let next = self.input_fetch().await?;
723        self.remain -= 1;
724
725        // If current chunk is finished, verify it ends with CRLF [RFC7230§4.1].
726        if self.remain == 0 {
727            self.consume_multi(b"\r\n").await?;
728        }
729
730        Ok(Some(next))
731    }
732
733    // Parse the number of bytes in the next chunk.
734    async fn parse_size(&mut self) -> Result<Option<u64>, Error<R::Error>> {
735        let mut digits = [0_u8; 16];
736
737        let slice = match self.parse_digits(&mut digits[..]).await? {
738            Some(s) => str::from_utf8(s).map_err(|_| Error::InvalidBody)?,
739            None => return Ok(None),
740        };
741
742        let size = u64::from_str_radix(slice, 16).map_err(|_| Error::InvalidBody)?;
743
744        Ok(Some(size))
745    }
746
747    // Extract the hex digits for the current chunk size.
748    async fn parse_digits<'a>(
749        &'a mut self,
750        digits: &'a mut [u8],
751    ) -> Result<Option<&'a [u8]>, Error<R::Error>> {
752        // Number of hex digits that have been extracted.
753        let mut len = 0;
754
755        loop {
756            let b = match self.input_next().await? {
757                Some(b) => b,
758                None => {
759                    return if len == 0 {
760                        // If EOF at the beginning of a new chunk, the stream is finished.
761                        Ok(None)
762                    } else {
763                        Err(Error::IncompleteBody)
764                    };
765                }
766            };
767
768            match b {
769                b'\r' => {
770                    self.consume(b'\n').await?;
771                    break;
772                }
773                b';' => {
774                    self.consume_ext().await?;
775                    break;
776                }
777                _ => {
778                    match digits.get_mut(len) {
779                        Some(d) => *d = b,
780                        None => return Err(Error::InvalidBody),
781                    }
782
783                    len += 1;
784                }
785            }
786        }
787
788        Ok(Some(&digits[..len]))
789    }
790
791    // Consume and discard current chunk extension.
792    // This doesn't check whether the characters up to CRLF actually have correct syntax.
793    async fn consume_ext(&mut self) -> Result<(), Error<R::Error>> {
794        self.consume_header().await?;
795
796        Ok(())
797    }
798
799    // Consume and discard the optional trailer following the last chunk.
800    async fn consume_trailer(&mut self) -> Result<(), Error<R::Error>> {
801        while self.consume_header().await? {}
802
803        Ok(())
804    }
805
806    // Consume and discard each header in the optional trailer following the last chunk.
807    async fn consume_header(&mut self) -> Result<bool, Error<R::Error>> {
808        let mut first = self.input_fetch().await?;
809        let mut len = 1;
810
811        loop {
812            let second = self.input_fetch().await?;
813            len += 1;
814
815            if first == b'\r' && second == b'\n' {
816                return Ok(len > 2);
817            }
818
819            first = second;
820        }
821    }
822
823    // Verify the next bytes in the stream match the expectation.
824    async fn consume_multi(&mut self, bytes: &[u8]) -> Result<(), Error<R::Error>> {
825        for byte in bytes {
826            self.consume(*byte).await?;
827        }
828
829        Ok(())
830    }
831
832    // Verify the next byte in the stream is matching the expectation.
833    async fn consume(&mut self, byte: u8) -> Result<(), Error<R::Error>> {
834        if self.input_fetch().await? == byte {
835            Ok(())
836        } else {
837            Err(Error::InvalidBody)
838        }
839    }
840
841    async fn input_fetch(&mut self) -> Result<u8, Error<R::Error>> {
842        self.input_next().await?.ok_or(Error::IncompleteBody)
843    }
844
845    async fn input_next(&mut self) -> Result<Option<u8>, Error<R::Error>> {
846        if self.buf_offset == self.buf_len {
847            self.buf_len = self.input.read(self.buf).await.map_err(Error::Io)?;
848            self.buf_offset = 0;
849        }
850
851        if self.buf_len > 0 {
852            let byte = self.buf[self.buf_offset];
853            self.buf_offset += 1;
854
855            Ok(Some(byte))
856        } else {
857            Ok(None)
858        }
859    }
860}
861
862impl<R> ErrorType for ChunkedRead<'_, R>
863where
864    R: ErrorType,
865{
866    type Error = Error<R::Error>;
867}
868
869impl<R> Read for ChunkedRead<'_, R>
870where
871    R: Read,
872{
873    async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
874        for (index, byte_pos) in buf.iter_mut().enumerate() {
875            if let Some(byte) = self.next().await? {
876                *byte_pos = byte;
877            } else {
878                return Ok(index);
879            }
880        }
881
882        Ok(buf.len())
883    }
884}
885
886/// Represents an outgoing HTTP request stream body
887///
888/// Implements the `Write` trait to write the body to the stream
889#[allow(private_interfaces)]
890pub enum SendBody<W> {
891    /// The body is raw and should be written as is (only possible for HTTP responses with connection = Close)
892    Raw(W),
893    /// The body is of a known length (Content-Length)
894    ContentLen(ContentLenWrite<W>),
895    /// The body is chunked (Transfer-Encoding: chunked)
896    Chunked(ChunkedWrite<W>),
897}
898
899impl<W> SendBody<W>
900where
901    W: Write,
902{
903    /// Create a new body
904    ///
905    /// Parameters:
906    /// - `body_type`: The type of the body, as resolved using `BodyType::resolve`
907    /// - `output`: The raw output stream
908    pub fn new(body_type: BodyType, output: W) -> SendBody<W> {
909        match body_type {
910            BodyType::Chunked => SendBody::Chunked(ChunkedWrite::new(output)),
911            BodyType::ContentLen(content_len) => {
912                SendBody::ContentLen(ContentLenWrite::new(content_len, output))
913            }
914            BodyType::Raw => SendBody::Raw(output),
915        }
916    }
917
918    /// Check if the body has been completely written to
919    pub fn is_complete(&self) -> bool {
920        match self {
921            Self::ContentLen(w) => w.is_complete(),
922            _ => true,
923        }
924    }
925
926    /// Check if the body needs to be closed (i.e. the underlying output stream cannot be re-used for Keep-Alive connections)
927    pub fn needs_close(&self) -> bool {
928        !self.is_complete() || matches!(self, Self::Raw(_))
929    }
930
931    /// Finish writing the body (necessary for chunked encoding)
932    pub async fn finish(&mut self) -> Result<(), Error<W::Error>>
933    where
934        W: Write,
935    {
936        match self {
937            Self::Raw(_) => (),
938            Self::ContentLen(w) => {
939                if !w.is_complete() {
940                    return Err(Error::IncompleteBody);
941                }
942            }
943            Self::Chunked(w) => w.finish().await?,
944        }
945
946        self.flush().await?;
947
948        Ok(())
949    }
950
951    /// Return a mutable reference to the underlying raw writer
952    pub fn as_raw_writer(&mut self) -> &mut W {
953        match self {
954            Self::Raw(w) => w,
955            Self::ContentLen(w) => &mut w.output,
956            Self::Chunked(w) => &mut w.output,
957        }
958    }
959
960    /// Release the body, returning the underlying raw writer
961    pub fn release(self) -> W {
962        match self {
963            Self::Raw(w) => w,
964            Self::ContentLen(w) => w.release(),
965            Self::Chunked(w) => w.release(),
966        }
967    }
968}
969
970impl<W> ErrorType for SendBody<W>
971where
972    W: ErrorType,
973{
974    type Error = Error<W::Error>;
975}
976
977impl<W> Write for SendBody<W>
978where
979    W: Write,
980{
981    async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
982        match self {
983            Self::Raw(w) => Ok(w.write(buf).await.map_err(Error::Io)?),
984            Self::ContentLen(w) => Ok(w.write(buf).await?),
985            Self::Chunked(w) => Ok(w.write(buf).await?),
986        }
987    }
988
989    async fn flush(&mut self) -> Result<(), Self::Error> {
990        match self {
991            Self::Raw(w) => Ok(w.flush().await.map_err(Error::Io)?),
992            Self::ContentLen(w) => Ok(w.flush().await?),
993            Self::Chunked(w) => Ok(w.flush().await?),
994        }
995    }
996}
997
998pub(crate) struct ContentLenWrite<W> {
999    content_len: u64,
1000    write_len: u64,
1001    output: W,
1002}
1003
1004impl<W> ContentLenWrite<W> {
1005    pub const fn new(content_len: u64, output: W) -> Self {
1006        Self {
1007            content_len,
1008            write_len: 0,
1009            output,
1010        }
1011    }
1012
1013    pub fn is_complete(&self) -> bool {
1014        self.content_len == self.write_len
1015    }
1016
1017    pub fn release(self) -> W {
1018        self.output
1019    }
1020}
1021
1022impl<W> ErrorType for ContentLenWrite<W>
1023where
1024    W: ErrorType,
1025{
1026    type Error = Error<W::Error>;
1027}
1028
1029impl<W> Write for ContentLenWrite<W>
1030where
1031    W: Write,
1032{
1033    async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
1034        if self.content_len >= self.write_len + buf.len() as u64 {
1035            let write = self.output.write(buf).await.map_err(Error::Io)?;
1036            self.write_len += write as u64;
1037
1038            Ok(write)
1039        } else {
1040            Err(Error::TooLongBody)
1041        }
1042    }
1043
1044    async fn flush(&mut self) -> Result<(), Self::Error> {
1045        self.output.flush().await.map_err(Error::Io)
1046    }
1047}
1048
1049pub(crate) struct ChunkedWrite<W> {
1050    output: W,
1051    finished: bool,
1052}
1053
1054impl<W> ChunkedWrite<W> {
1055    pub const fn new(output: W) -> Self {
1056        Self {
1057            output,
1058            finished: false,
1059        }
1060    }
1061
1062    pub async fn finish(&mut self) -> Result<(), Error<W::Error>>
1063    where
1064        W: Write,
1065    {
1066        if !self.finished {
1067            self.output
1068                .write_all(b"0\r\n\r\n")
1069                .await
1070                .map_err(Error::Io)?;
1071            self.finished = true;
1072        }
1073
1074        Ok(())
1075    }
1076
1077    pub fn release(self) -> W {
1078        self.output
1079    }
1080}
1081
1082impl<W> ErrorType for ChunkedWrite<W>
1083where
1084    W: ErrorType,
1085{
1086    type Error = Error<W::Error>;
1087}
1088
1089impl<W> Write for ChunkedWrite<W>
1090where
1091    W: Write,
1092{
1093    async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
1094        if self.finished {
1095            Err(Error::InvalidState)
1096        } else if !buf.is_empty() {
1097            let mut len_str = heapless::String::<8>::new();
1098            write_unwrap!(&mut len_str, "{:x}", buf.len());
1099
1100            self.output
1101                .write_all(len_str.as_bytes())
1102                .await
1103                .map_err(Error::Io)?;
1104
1105            self.output.write_all(b"\r\n").await.map_err(Error::Io)?;
1106            self.output.write_all(buf).await.map_err(Error::Io)?;
1107            self.output.write_all(b"\r\n").await.map_err(Error::Io)?;
1108
1109            Ok(buf.len())
1110        } else {
1111            Ok(0)
1112        }
1113    }
1114
1115    async fn flush(&mut self) -> Result<(), Self::Error> {
1116        self.output.flush().await.map_err(Error::Io)
1117    }
1118}
1119
1120mod raw {
1121    use core::str;
1122
1123    use embedded_io_async::{Read, Write};
1124
1125    use crate::{BodyType, ConnectionType};
1126
1127    use super::Error;
1128
1129    pub(crate) async fn read_reply_buf<const N: usize, R>(
1130        mut input: R,
1131        buf: &mut [u8],
1132        request: bool,
1133        exact: bool,
1134    ) -> Result<(usize, usize), Error<R::Error>>
1135    where
1136        R: Read,
1137    {
1138        if exact {
1139            let raw_headers_len = read_headers(&mut input, buf).await?;
1140
1141            let mut headers = [httparse::EMPTY_HEADER; N];
1142
1143            let status = if request {
1144                httparse::Request::new(&mut headers).parse(&buf[..raw_headers_len])?
1145            } else {
1146                httparse::Response::new(&mut headers).parse(&buf[..raw_headers_len])?
1147            };
1148
1149            if let httparse::Status::Complete(headers_len) = status {
1150                return Ok((raw_headers_len, headers_len));
1151            }
1152
1153            Err(Error::TooManyHeaders)
1154        } else {
1155            let mut offset = 0;
1156            let mut size = 0;
1157
1158            while buf.len() > size {
1159                let read = input.read(&mut buf[offset..]).await.map_err(Error::Io)?;
1160                if read == 0 {
1161                    Err(if offset == 0 {
1162                        Error::ConnectionClosed
1163                    } else {
1164                        Error::IncompleteHeaders
1165                    })?;
1166                }
1167
1168                offset += read;
1169                size += read;
1170
1171                let mut headers = [httparse::EMPTY_HEADER; N];
1172
1173                let status = if request {
1174                    httparse::Request::new(&mut headers).parse(&buf[..size])?
1175                } else {
1176                    httparse::Response::new(&mut headers).parse(&buf[..size])?
1177                };
1178
1179                if let httparse::Status::Complete(headers_len) = status {
1180                    return Ok((size, headers_len));
1181                }
1182            }
1183
1184            Err(Error::TooManyHeaders)
1185        }
1186    }
1187
1188    pub(crate) async fn read_headers<R>(
1189        mut input: R,
1190        buf: &mut [u8],
1191    ) -> Result<usize, Error<R::Error>>
1192    where
1193        R: Read,
1194    {
1195        let mut offset = 0;
1196        let mut byte = [0];
1197
1198        loop {
1199            if offset == buf.len() {
1200                Err(Error::TooLongHeaders)?;
1201            }
1202
1203            let read = input.read(&mut byte).await.map_err(Error::Io)?;
1204
1205            if read == 0 {
1206                Err(if offset == 0 {
1207                    Error::ConnectionClosed
1208                } else {
1209                    Error::IncompleteHeaders
1210                })?;
1211            }
1212
1213            buf[offset] = byte[0];
1214
1215            offset += 1;
1216
1217            if offset >= b"\r\n\r\n".len() && buf[offset - 4..offset] == *b"\r\n\r\n" {
1218                break Ok(offset);
1219            }
1220        }
1221    }
1222
1223    pub(crate) async fn send_version<W>(mut output: W, http11: bool) -> Result<(), Error<W::Error>>
1224    where
1225        W: Write,
1226    {
1227        output
1228            .write_all(if http11 { b"HTTP/1.1" } else { b"HTTP/1.0" })
1229            .await
1230            .map_err(Error::Io)
1231    }
1232
1233    pub(crate) async fn send_headers<'a, H, W>(
1234        headers: H,
1235        mut output: W,
1236    ) -> Result<(Option<ConnectionType>, Option<BodyType>), Error<W::Error>>
1237    where
1238        W: Write,
1239        H: IntoIterator<Item = (&'a str, &'a [u8])>,
1240    {
1241        let mut connection = None;
1242        let mut body = None;
1243
1244        for (name, value) in headers.into_iter() {
1245            let header_connection =
1246                ConnectionType::from_header(name, str::from_utf8(value).unwrap_or(""));
1247
1248            if let Some(header_connection) = header_connection {
1249                if let Some(connection) = connection {
1250                    warn!(
1251                        "Multiple Connection headers found. Current {} and new {}",
1252                        connection, header_connection
1253                    );
1254                }
1255
1256                // The last connection header wins
1257                connection = Some(header_connection);
1258            }
1259
1260            let header_body = BodyType::from_header(name, str::from_utf8(value).unwrap_or(""));
1261
1262            if let Some(header_body) = header_body {
1263                if let Some(body) = body {
1264                    warn!(
1265                        "Multiple body type headers found. Current {} and new {}",
1266                        body, header_body
1267                    );
1268                }
1269
1270                // The last body header wins
1271                body = Some(header_body);
1272            }
1273
1274            send_header(name, value, &mut output).await?;
1275        }
1276
1277        Ok((connection, body))
1278    }
1279
1280    pub(crate) async fn send_header<W>(
1281        name: &str,
1282        value: &[u8],
1283        mut output: W,
1284    ) -> Result<(), Error<W::Error>>
1285    where
1286        W: Write,
1287    {
1288        output.write_all(name.as_bytes()).await.map_err(Error::Io)?;
1289        output.write_all(b": ").await.map_err(Error::Io)?;
1290        output.write_all(value).await.map_err(Error::Io)?;
1291        output.write_all(b"\r\n").await.map_err(Error::Io)?;
1292
1293        Ok(())
1294    }
1295
1296    pub(crate) async fn send_headers_end<W>(mut output: W) -> Result<(), Error<W::Error>>
1297    where
1298        W: Write,
1299    {
1300        output.write_all(b"\r\n").await.map_err(Error::Io)
1301    }
1302}
1303
1304#[cfg(test)]
1305mod test {
1306    use embedded_io_async::{ErrorType, Read};
1307
1308    use super::*;
1309
1310    struct SliceRead<'a>(&'a [u8]);
1311
1312    impl<'a> ErrorType for SliceRead<'a> {
1313        type Error = core::convert::Infallible;
1314    }
1315
1316    impl<'a> Read for SliceRead<'a> {
1317        async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
1318            let len = core::cmp::min(buf.len(), self.0.len());
1319            buf[..len].copy_from_slice(&self.0[..len]);
1320
1321            self.0 = &self.0[len..];
1322
1323            Ok(len)
1324        }
1325    }
1326
1327    #[test]
1328    fn test_chunked_bytes() {
1329        // Normal
1330        expect(b"A\r\nabcdefghij\r\n2\r\n42\r\n", Some(b"abcdefghij42"));
1331        expect(b"a\r\nabc\r\nfghij\r\n2\r\n42\r\n", Some(b"abc\r\nfghij42"));
1332
1333        // Trailing headers
1334        expect(b"4\r\nabcd\r\n0\r\n\r\n", Some(b"abcd"));
1335        expect(b"4\r\nabcd\r\n0\r\nA: B\r\n\r\n", Some(b"abcd"));
1336
1337        // Empty
1338        expect(b"", Some(b""));
1339        expect(b"0\r\n\r\n", Some(b""));
1340
1341        // Erroneous
1342        expect(b"h\r\n", None);
1343        expect(b"\r\na", None);
1344        expect(b"4\r\nabcdefg", None);
1345    }
1346
1347    fn expect(input: &[u8], expected: Option<&[u8]>) {
1348        embassy_futures::block_on(async move {
1349            let mut buf1 = [0; 64];
1350            let mut buf2 = [0; 64];
1351
1352            let stream = SliceRead(input);
1353            let mut r = ChunkedRead::new(stream, &mut buf1, 0);
1354
1355            if let Some(expected) = expected {
1356                assert!(r.read_exact(&mut buf2[..expected.len()]).await.is_ok());
1357
1358                assert_eq!(&buf2[..expected.len()], expected);
1359
1360                let len = r.read(&mut buf2).await;
1361                assert!(len.is_ok());
1362
1363                assert_eq!(unwrap!(len), 0);
1364            } else {
1365                assert!(r.read(&mut buf2).await.is_err());
1366            }
1367        })
1368    }
1369}