1use core::cmp::min;
2use core::fmt::{Display, Write as _};
3use core::str;
4
5use embedded_io_async::{ErrorType, Read, Write};
6
7use httparse::Status;
8
9use crate::ws::UpgradeError;
10use crate::{
11 BodyType, ConnectionType, Headers, HeadersMismatchError, Method, RequestHeaders,
12 ResponseHeaders,
13};
14
15pub mod client;
16pub mod server;
17
18#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
20pub enum Error<E> {
21 InvalidHeaders,
22 InvalidBody,
23 TooManyHeaders,
24 TooLongHeaders,
25 TooLongBody,
26 IncompleteHeaders,
27 IncompleteBody,
28 InvalidState,
29 ConnectionClosed,
30 HeadersMismatchError(HeadersMismatchError),
31 WsUpgradeError(UpgradeError),
32 Io(E),
33}
34
35pub type ErrorKind = Error<edge_nal::io::ErrorKind>;
36
37impl<E> Error<E>
38where
39 E: edge_nal::io::Error,
40{
41 pub fn erase(&self) -> Error<edge_nal::io::ErrorKind> {
42 match self {
43 Self::InvalidHeaders => Error::InvalidHeaders,
44 Self::InvalidBody => Error::InvalidBody,
45 Self::TooManyHeaders => Error::TooManyHeaders,
46 Self::TooLongHeaders => Error::TooLongHeaders,
47 Self::TooLongBody => Error::TooLongBody,
48 Self::IncompleteHeaders => Error::IncompleteHeaders,
49 Self::IncompleteBody => Error::IncompleteBody,
50 Self::InvalidState => Error::InvalidState,
51 Self::ConnectionClosed => Error::ConnectionClosed,
52 Self::HeadersMismatchError(e) => Error::HeadersMismatchError(*e),
53 Self::WsUpgradeError(e) => Error::WsUpgradeError(*e),
54 Self::Io(e) => Error::Io(e.kind()),
55 }
56 }
57}
58
59impl<E> From<httparse::Error> for Error<E> {
60 fn from(e: httparse::Error) -> Self {
61 match e {
62 httparse::Error::HeaderName => Self::InvalidHeaders,
63 httparse::Error::HeaderValue => Self::InvalidHeaders,
64 httparse::Error::NewLine => Self::InvalidHeaders,
65 httparse::Error::Status => Self::InvalidHeaders,
66 httparse::Error::Token => Self::InvalidHeaders,
67 httparse::Error::TooManyHeaders => Self::TooManyHeaders,
68 httparse::Error::Version => Self::InvalidHeaders,
69 }
70 }
71}
72
73impl<E> From<HeadersMismatchError> for Error<E> {
74 fn from(e: HeadersMismatchError) -> Self {
75 Self::HeadersMismatchError(e)
76 }
77}
78
79impl<E> From<UpgradeError> for Error<E> {
80 fn from(e: UpgradeError) -> Self {
81 Self::WsUpgradeError(e)
82 }
83}
84
85impl<E> embedded_io_async::Error for Error<E>
86where
87 E: embedded_io_async::Error,
88{
89 fn kind(&self) -> embedded_io_async::ErrorKind {
90 match self {
91 Self::Io(e) => e.kind(),
92 _ => embedded_io_async::ErrorKind::Other,
93 }
94 }
95}
96
97impl<E> Display for Error<E>
98where
99 E: Display,
100{
101 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
102 match self {
103 Self::InvalidHeaders => write!(f, "Invalid HTTP headers or status line"),
104 Self::InvalidBody => write!(f, "Invalid HTTP body"),
105 Self::TooManyHeaders => write!(f, "Too many HTTP headers"),
106 Self::TooLongHeaders => write!(f, "HTTP headers section is too long"),
107 Self::TooLongBody => write!(f, "HTTP body is too long"),
108 Self::IncompleteHeaders => write!(f, "HTTP headers section is incomplete"),
109 Self::IncompleteBody => write!(f, "HTTP body is incomplete"),
110 Self::InvalidState => write!(f, "Connection is not in requested state"),
111 Self::HeadersMismatchError(e) => write!(f, "Headers mismatch: {e}"),
112 Self::WsUpgradeError(e) => write!(f, "WebSocket upgrade error: {e}"),
113 Self::ConnectionClosed => write!(f, "Connection closed"),
114 Self::Io(e) => write!(f, "{e}"),
115 }
116 }
117}
118
119#[cfg(feature = "defmt")]
120impl<E> defmt::Format for Error<E>
121where
122 E: defmt::Format,
123{
124 fn format(&self, f: defmt::Formatter<'_>) {
125 match self {
126 Self::InvalidHeaders => defmt::write!(f, "Invalid HTTP headers or status line"),
127 Self::InvalidBody => defmt::write!(f, "Invalid HTTP body"),
128 Self::TooManyHeaders => defmt::write!(f, "Too many HTTP headers"),
129 Self::TooLongHeaders => defmt::write!(f, "HTTP headers section is too long"),
130 Self::TooLongBody => defmt::write!(f, "HTTP body is too long"),
131 Self::IncompleteHeaders => defmt::write!(f, "HTTP headers section is incomplete"),
132 Self::IncompleteBody => defmt::write!(f, "HTTP body is incomplete"),
133 Self::InvalidState => defmt::write!(f, "Connection is not in requested state"),
134 Self::HeadersMismatchError(e) => defmt::write!(f, "Headers mismatch: {}", e),
135 Self::WsUpgradeError(e) => defmt::write!(f, "WebSocket upgrade error: {}", e),
136 Self::ConnectionClosed => defmt::write!(f, "Connection closed"),
137 Self::Io(e) => defmt::write!(f, "{}", e),
138 }
139 }
140}
141
142#[cfg(feature = "std")]
143impl<E> std::error::Error for Error<E> where E: std::error::Error {}
144
145impl<'b, const N: usize> RequestHeaders<'b, N> {
146 pub async fn receive<R>(
148 &mut self,
149 buf: &'b mut [u8],
150 mut input: R,
151 exact: bool,
152 ) -> Result<(&'b mut [u8], usize), Error<R::Error>>
153 where
154 R: Read,
155 {
156 let (read_len, headers_len) =
157 match raw::read_reply_buf::<N, _>(&mut input, buf, true, exact).await {
158 Ok(read_len) => read_len,
159 Err(e) => return Err(e),
160 };
161
162 let mut parser = httparse::Request::new(&mut self.headers.0);
163
164 let (headers_buf, body_buf) = buf.split_at_mut(headers_len);
165
166 let status = match parser.parse(headers_buf) {
167 Ok(status) => status,
168 Err(e) => return Err(e.into()),
169 };
170
171 if let Status::Complete(headers_len2) = status {
172 if headers_len != headers_len2 {
173 unreachable!("Should not happen. HTTP header parsing is indeterminate.")
174 }
175
176 self.http11 = match parser.version {
177 Some(0) => false,
178 Some(1) => true,
179 _ => Err(Error::InvalidHeaders)?,
180 };
181
182 let method_str = parser.method.ok_or(Error::InvalidHeaders)?;
183 self.method = Method::new(method_str).ok_or(Error::InvalidHeaders)?;
184 self.path = parser.path.ok_or(Error::InvalidHeaders)?;
185
186 trace!("Received:\n{}", self);
187
188 Ok((body_buf, read_len - headers_len))
189 } else {
190 unreachable!("Secondary parse of already loaded buffer failed.")
191 }
192 }
193
194 pub fn resolve<E>(&self) -> Result<(ConnectionType, BodyType), Error<E>> {
196 self.headers.resolve::<E>(None, true, self.http11)
197 }
198
199 pub async fn send<W>(
201 &self,
202 chunked_if_unspecified: bool,
203 mut output: W,
204 ) -> Result<(ConnectionType, BodyType), Error<W::Error>>
205 where
206 W: Write,
207 {
208 send_request(self.http11, self.method, self.path, &mut output).await?;
209
210 self.headers
211 .send(None, true, self.http11, chunked_if_unspecified, output)
212 .await
213 }
214}
215
216impl<'b, const N: usize> ResponseHeaders<'b, N> {
217 pub async fn receive<R>(
219 &mut self,
220 buf: &'b mut [u8],
221 mut input: R,
222 exact: bool,
223 ) -> Result<(&'b mut [u8], usize), Error<R::Error>>
224 where
225 R: Read,
226 {
227 let (read_len, headers_len) =
228 raw::read_reply_buf::<N, _>(&mut input, buf, false, exact).await?;
229
230 let mut parser = httparse::Response::new(&mut self.headers.0);
231
232 let (headers_buf, body_buf) = buf.split_at_mut(headers_len);
233
234 let status = parser.parse(headers_buf).map_err(Error::from)?;
235
236 if let Status::Complete(headers_len2) = status {
237 if headers_len != headers_len2 {
238 unreachable!("Should not happen. HTTP header parsing is indeterminate.")
239 }
240
241 self.http11 = match parser.version {
242 Some(0) => false,
243 Some(1) => true,
244 _ => Err(Error::InvalidHeaders)?,
245 };
246
247 self.code = parser.code.ok_or(Error::InvalidHeaders)?;
248 self.reason = parser.reason;
249
250 trace!("Received:\n{}", self);
251
252 Ok((body_buf, read_len - headers_len))
253 } else {
254 unreachable!("Secondary parse of already loaded buffer failed.")
255 }
256 }
257
258 pub fn resolve<E>(
260 &self,
261 request_connection_type: ConnectionType,
262 ) -> Result<(ConnectionType, BodyType), Error<E>> {
263 self.headers
264 .resolve::<E>(Some(request_connection_type), false, self.http11)
265 }
266
267 pub async fn send<W>(
269 &self,
270 request_connection_type: ConnectionType,
271 chunked_if_unspecified: bool,
272 mut output: W,
273 ) -> Result<(ConnectionType, BodyType), Error<W::Error>>
274 where
275 W: Write,
276 {
277 send_status(self.http11, self.code, self.reason, &mut output).await?;
278
279 self.headers
280 .send(
281 Some(request_connection_type),
282 false,
283 self.http11,
284 chunked_if_unspecified,
285 output,
286 )
287 .await
288 }
289}
290
291pub(crate) async fn send_request<W>(
292 http11: bool,
293 method: Method,
294 path: &str,
295 mut output: W,
296) -> Result<(), Error<W::Error>>
297where
298 W: Write,
299{
300 output
303 .write_all(method.as_str().as_bytes())
304 .await
305 .map_err(Error::Io)?;
306 output.write_all(b" ").await.map_err(Error::Io)?;
307 output.write_all(path.as_bytes()).await.map_err(Error::Io)?;
308 output.write_all(b" ").await.map_err(Error::Io)?;
309 raw::send_version(&mut output, http11).await?;
310 output.write_all(b"\r\n").await.map_err(Error::Io)?;
311
312 Ok(())
313}
314
315pub(crate) async fn send_status<W>(
316 http11: bool,
317 status: u16,
318 reason: Option<&str>,
319 mut output: W,
320) -> Result<(), Error<W::Error>>
321where
322 W: Write,
323{
324 raw::send_version(&mut output, http11).await?;
327 output.write_all(b" ").await.map_err(Error::Io)?;
328 let status_str: heapless::String<5> = unwrap!(status.try_into());
329 output
330 .write_all(status_str.as_bytes())
331 .await
332 .map_err(Error::Io)?;
333 output.write_all(b" ").await.map_err(Error::Io)?;
334 if let Some(reason) = reason {
335 output
336 .write_all(reason.as_bytes())
337 .await
338 .map_err(Error::Io)?;
339 }
340 output.write_all(b"\r\n").await.map_err(Error::Io)?;
341
342 Ok(())
343}
344
345pub(crate) async fn send_headers<'a, H, W>(
346 headers: H,
347 carry_over_connection_type: Option<ConnectionType>,
348 request: bool,
349 http11: bool,
350 chunked_if_unspecified: bool,
351 mut output: W,
352) -> Result<(ConnectionType, BodyType), Error<W::Error>>
353where
354 W: Write,
355 H: IntoIterator<Item = &'a (&'a str, &'a str)>,
356{
357 let (headers_connection_type, headers_body_type) = raw::send_headers(
358 headers
359 .into_iter()
360 .map(|(name, value)| (*name, value.as_bytes())),
361 &mut output,
362 )
363 .await?;
364
365 send_headers_end(
366 headers_connection_type,
367 headers_body_type,
368 carry_over_connection_type,
369 request,
370 http11,
371 chunked_if_unspecified,
372 output,
373 )
374 .await
375}
376
377async fn send_headers_end<W>(
378 headers_connection_type: Option<ConnectionType>,
379 headers_body_type: Option<BodyType>,
380 carry_over_connection_type: Option<ConnectionType>,
381 request: bool,
382 http11: bool,
383 chunked_if_unspecified: bool,
384 mut output: W,
385) -> Result<(ConnectionType, BodyType), Error<W::Error>>
386where
387 W: Write,
388{
389 let connection_type =
390 ConnectionType::resolve(headers_connection_type, carry_over_connection_type, http11)?;
391
392 let body_type = BodyType::resolve(
393 headers_body_type,
394 connection_type,
395 request,
396 http11,
397 chunked_if_unspecified,
398 )?;
399
400 if headers_connection_type.is_none() {
401 let (name, value) = connection_type.raw_header();
403
404 raw::send_header(name, value, &mut output).await?;
405 }
406
407 if headers_body_type.is_none() {
408 let mut buf = heapless::String::new();
409
410 if let Some((name, value)) = body_type.raw_header(&mut buf) {
411 raw::send_header(name, value, &mut output).await?;
413 }
414 }
415
416 raw::send_headers_end(output).await?;
417
418 Ok((connection_type, body_type))
419}
420
421impl<const N: usize> Headers<'_, N> {
422 fn resolve<E>(
423 &self,
424 carry_over_connection_type: Option<ConnectionType>,
425 request: bool,
426 http11: bool,
427 ) -> Result<(ConnectionType, BodyType), Error<E>> {
428 let headers_connection_type = ConnectionType::from_headers(self.iter());
429 let headers_body_type = BodyType::from_headers(self.iter());
430
431 let connection_type =
432 ConnectionType::resolve(headers_connection_type, carry_over_connection_type, http11)?;
433 let body_type =
434 BodyType::resolve(headers_body_type, connection_type, request, http11, false)?;
435
436 Ok((connection_type, body_type))
437 }
438
439 async fn send<W>(
440 &self,
441 carry_over_connection_type: Option<ConnectionType>,
442 request: bool,
443 http11: bool,
444 chunked_if_unspecified: bool,
445 mut output: W,
446 ) -> Result<(ConnectionType, BodyType), Error<W::Error>>
447 where
448 W: Write,
449 {
450 let (headers_connection_type, headers_body_type) =
451 raw::send_headers(self.iter_raw(), &mut output).await?;
452
453 send_headers_end(
454 headers_connection_type,
455 headers_body_type,
456 carry_over_connection_type,
457 request,
458 http11,
459 chunked_if_unspecified,
460 output,
461 )
462 .await
463 }
464}
465
466#[allow(private_interfaces)]
470pub enum Body<'b, R> {
471 Raw(PartiallyRead<'b, R>),
473 ContentLen(ContentLenRead<PartiallyRead<'b, R>>),
475 Chunked(ChunkedRead<'b, PartiallyRead<'b, R>>),
477}
478
479impl<'b, R> Body<'b, R>
480where
481 R: Read,
482{
483 pub fn new(body_type: BodyType, buf: &'b mut [u8], read_len: usize, input: R) -> Self {
491 match body_type {
492 BodyType::Chunked => Body::Chunked(ChunkedRead::new(
493 PartiallyRead::new(&[], input),
494 buf,
495 read_len,
496 )),
497 BodyType::ContentLen(content_len) => Body::ContentLen(ContentLenRead::new(
498 content_len,
499 PartiallyRead::new(&buf[..read_len], input),
500 )),
501 BodyType::Raw => Body::Raw(PartiallyRead::new(&buf[..read_len], input)),
502 }
503 }
504
505 pub fn needs_close(&self) -> bool {
507 !self.is_complete() || matches!(self, Self::Raw(_))
508 }
509
510 pub fn is_complete(&self) -> bool {
512 match self {
513 Self::Raw(_) => true,
514 Self::ContentLen(r) => r.is_complete(),
515 Self::Chunked(r) => r.is_complete(),
516 }
517 }
518
519 pub fn as_raw_reader(&mut self) -> &mut R {
521 match self {
522 Self::Raw(r) => &mut r.input,
523 Self::ContentLen(r) => &mut r.input.input,
524 Self::Chunked(r) => &mut r.input.input,
525 }
526 }
527
528 pub fn release(self) -> R {
530 match self {
531 Self::Raw(r) => r.release(),
532 Self::ContentLen(r) => r.release().release(),
533 Self::Chunked(r) => r.release().release(),
534 }
535 }
536}
537
538impl<R> ErrorType for Body<'_, R>
539where
540 R: ErrorType,
541{
542 type Error = Error<R::Error>;
543}
544
545impl<R> Read for Body<'_, R>
546where
547 R: Read,
548{
549 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
550 match self {
551 Self::Raw(read) => Ok(read.read(buf).await.map_err(Error::Io)?),
552 Self::ContentLen(read) => Ok(read.read(buf).await?),
553 Self::Chunked(read) => Ok(read.read(buf).await?),
554 }
555 }
556}
557
558pub(crate) struct PartiallyRead<'b, R> {
559 buf: &'b [u8],
560 read_len: usize,
561 input: R,
562}
563
564impl<'b, R> PartiallyRead<'b, R> {
565 pub const fn new(buf: &'b [u8], input: R) -> Self {
566 Self {
567 buf,
568 read_len: 0,
569 input,
570 }
571 }
572
573 pub fn release(self) -> R {
582 self.input
583 }
584}
585
586impl<R> ErrorType for PartiallyRead<'_, R>
587where
588 R: ErrorType,
589{
590 type Error = R::Error;
591}
592
593impl<R> Read for PartiallyRead<'_, R>
594where
595 R: Read,
596{
597 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
598 if self.buf.len() > self.read_len {
599 let len = min(buf.len(), self.buf.len() - self.read_len);
600 buf[..len].copy_from_slice(&self.buf[self.read_len..self.read_len + len]);
601
602 self.read_len += len;
603
604 Ok(len)
605 } else {
606 Ok(self.input.read(buf).await?)
607 }
608 }
609}
610
611pub(crate) struct ContentLenRead<R> {
612 content_len: u64,
613 read_len: u64,
614 input: R,
615}
616
617impl<R> ContentLenRead<R> {
618 pub const fn new(content_len: u64, input: R) -> Self {
619 Self {
620 content_len,
621 read_len: 0,
622 input,
623 }
624 }
625
626 pub fn is_complete(&self) -> bool {
627 self.content_len == self.read_len
628 }
629
630 pub fn release(self) -> R {
631 self.input
632 }
633}
634
635impl<R> ErrorType for ContentLenRead<R>
636where
637 R: ErrorType,
638{
639 type Error = Error<R::Error>;
640}
641
642impl<R> Read for ContentLenRead<R>
643where
644 R: Read,
645{
646 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
647 let len = min(buf.len() as _, self.content_len - self.read_len);
648 if len > 0 {
649 let read = self
650 .input
651 .read(&mut buf[..len as _])
652 .await
653 .map_err(Error::Io)?;
654 self.read_len += read as u64;
655
656 Ok(read)
657 } else {
658 Ok(0)
659 }
660 }
661}
662
663pub(crate) struct ChunkedRead<'b, R> {
664 buf: &'b mut [u8],
665 buf_offset: usize,
666 buf_len: usize,
667 input: R,
668 remain: u64,
669 complete: bool,
670}
671
672impl<'b, R> ChunkedRead<'b, R>
673where
674 R: Read,
675{
676 pub fn new(input: R, buf: &'b mut [u8], buf_len: usize) -> Self {
677 Self {
678 buf,
679 buf_offset: 0,
680 buf_len,
681 input,
682 remain: 0,
683 complete: false,
684 }
685 }
686
687 pub fn is_complete(&self) -> bool {
688 self.complete
689 }
690
691 pub fn release(self) -> R {
692 self.input
693 }
694
695 async fn next(&mut self) -> Result<Option<u8>, Error<R::Error>> {
703 if self.complete {
704 return Ok(None);
705 }
706
707 if self.remain == 0 {
708 if let Some(size) = self.parse_size().await? {
709 if size == 0 {
711 self.consume_trailer().await?;
712 self.complete = true;
713 return Ok(None);
714 }
715
716 self.remain = size;
717 } else {
718 self.complete = true;
719 return Ok(None);
720 }
721 }
722
723 let next = self.input_fetch().await?;
724 self.remain -= 1;
725
726 if self.remain == 0 {
728 self.consume_multi(b"\r\n").await?;
729 }
730
731 Ok(Some(next))
732 }
733
734 async fn parse_size(&mut self) -> Result<Option<u64>, Error<R::Error>> {
736 let mut digits = [0_u8; 16];
737
738 let slice = match self.parse_digits(&mut digits[..]).await? {
739 Some(s) => unsafe { str::from_utf8_unchecked(s) },
742 None => return Ok(None),
743 };
744
745 let size = u64::from_str_radix(slice, 16).map_err(|_| Error::InvalidBody)?;
746
747 Ok(Some(size))
748 }
749
750 async fn parse_digits<'a>(
752 &'a mut self,
753 digits: &'a mut [u8],
754 ) -> Result<Option<&'a [u8]>, Error<R::Error>> {
755 let mut len = 0;
757
758 loop {
759 let b = match self.input_next().await? {
760 Some(b) => b,
761 None => {
762 return if len == 0 {
763 Ok(None)
765 } else {
766 Err(Error::IncompleteBody)
767 };
768 }
769 };
770
771 match b {
772 b'\r' => {
773 self.consume(b'\n').await?;
774 break;
775 }
776 b';' => {
777 self.consume_ext().await?;
778 break;
779 }
780 _ => {
781 match digits.get_mut(len) {
782 Some(d) => *d = b,
783 None => return Err(Error::InvalidBody),
784 }
785
786 len += 1;
787 }
788 }
789 }
790
791 Ok(Some(&digits[..len]))
792 }
793
794 async fn consume_ext(&mut self) -> Result<(), Error<R::Error>> {
797 self.consume_header().await?;
798
799 Ok(())
800 }
801
802 async fn consume_trailer(&mut self) -> Result<(), Error<R::Error>> {
804 while self.consume_header().await? {}
805
806 Ok(())
807 }
808
809 async fn consume_header(&mut self) -> Result<bool, Error<R::Error>> {
811 let mut first = self.input_fetch().await?;
812 let mut len = 1;
813
814 loop {
815 let second = self.input_fetch().await?;
816 len += 1;
817
818 if first == b'\r' && second == b'\n' {
819 return Ok(len > 2);
820 }
821
822 first = second;
823 }
824 }
825
826 async fn consume_multi(&mut self, bytes: &[u8]) -> Result<(), Error<R::Error>> {
828 for byte in bytes {
829 self.consume(*byte).await?;
830 }
831
832 Ok(())
833 }
834
835 async fn consume(&mut self, byte: u8) -> Result<(), Error<R::Error>> {
837 if self.input_fetch().await? == byte {
838 Ok(())
839 } else {
840 Err(Error::InvalidBody)
841 }
842 }
843
844 async fn input_fetch(&mut self) -> Result<u8, Error<R::Error>> {
845 self.input_next().await?.ok_or(Error::IncompleteBody)
846 }
847
848 async fn input_next(&mut self) -> Result<Option<u8>, Error<R::Error>> {
849 if self.buf_offset == self.buf_len {
850 self.buf_len = self.input.read(self.buf).await.map_err(Error::Io)?;
851 self.buf_offset = 0;
852 }
853
854 if self.buf_len > 0 {
855 let byte = self.buf[self.buf_offset];
856 self.buf_offset += 1;
857
858 Ok(Some(byte))
859 } else {
860 Ok(None)
861 }
862 }
863}
864
865impl<R> ErrorType for ChunkedRead<'_, R>
866where
867 R: ErrorType,
868{
869 type Error = Error<R::Error>;
870}
871
872impl<R> Read for ChunkedRead<'_, R>
873where
874 R: Read,
875{
876 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
877 for (index, byte_pos) in buf.iter_mut().enumerate() {
878 if let Some(byte) = self.next().await? {
879 *byte_pos = byte;
880 } else {
881 return Ok(index);
882 }
883 }
884
885 Ok(buf.len())
886 }
887}
888
889#[allow(private_interfaces)]
893pub enum SendBody<W> {
894 Raw(W),
896 ContentLen(ContentLenWrite<W>),
898 Chunked(ChunkedWrite<W>),
900}
901
902impl<W> SendBody<W>
903where
904 W: Write,
905{
906 pub fn new(body_type: BodyType, output: W) -> SendBody<W> {
912 match body_type {
913 BodyType::Chunked => SendBody::Chunked(ChunkedWrite::new(output)),
914 BodyType::ContentLen(content_len) => {
915 SendBody::ContentLen(ContentLenWrite::new(content_len, output))
916 }
917 BodyType::Raw => SendBody::Raw(output),
918 }
919 }
920
921 pub fn is_complete(&self) -> bool {
923 match self {
924 Self::ContentLen(w) => w.is_complete(),
925 _ => true,
926 }
927 }
928
929 pub fn needs_close(&self) -> bool {
931 !self.is_complete() || matches!(self, Self::Raw(_))
932 }
933
934 pub async fn finish(&mut self) -> Result<(), Error<W::Error>>
936 where
937 W: Write,
938 {
939 match self {
940 Self::Raw(_) => (),
941 Self::ContentLen(w) => {
942 if !w.is_complete() {
943 return Err(Error::IncompleteBody);
944 }
945 }
946 Self::Chunked(w) => w.finish().await?,
947 }
948
949 self.flush().await?;
950
951 Ok(())
952 }
953
954 pub fn as_raw_writer(&mut self) -> &mut W {
956 match self {
957 Self::Raw(w) => w,
958 Self::ContentLen(w) => &mut w.output,
959 Self::Chunked(w) => &mut w.output,
960 }
961 }
962
963 pub fn release(self) -> W {
965 match self {
966 Self::Raw(w) => w,
967 Self::ContentLen(w) => w.release(),
968 Self::Chunked(w) => w.release(),
969 }
970 }
971}
972
973impl<W> ErrorType for SendBody<W>
974where
975 W: ErrorType,
976{
977 type Error = Error<W::Error>;
978}
979
980impl<W> Write for SendBody<W>
981where
982 W: Write,
983{
984 async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
985 match self {
986 Self::Raw(w) => Ok(w.write(buf).await.map_err(Error::Io)?),
987 Self::ContentLen(w) => Ok(w.write(buf).await?),
988 Self::Chunked(w) => Ok(w.write(buf).await?),
989 }
990 }
991
992 async fn flush(&mut self) -> Result<(), Self::Error> {
993 match self {
994 Self::Raw(w) => Ok(w.flush().await.map_err(Error::Io)?),
995 Self::ContentLen(w) => Ok(w.flush().await?),
996 Self::Chunked(w) => Ok(w.flush().await?),
997 }
998 }
999}
1000
1001pub(crate) struct ContentLenWrite<W> {
1002 content_len: u64,
1003 write_len: u64,
1004 output: W,
1005}
1006
1007impl<W> ContentLenWrite<W> {
1008 pub const fn new(content_len: u64, output: W) -> Self {
1009 Self {
1010 content_len,
1011 write_len: 0,
1012 output,
1013 }
1014 }
1015
1016 pub fn is_complete(&self) -> bool {
1017 self.content_len == self.write_len
1018 }
1019
1020 pub fn release(self) -> W {
1021 self.output
1022 }
1023}
1024
1025impl<W> ErrorType for ContentLenWrite<W>
1026where
1027 W: ErrorType,
1028{
1029 type Error = Error<W::Error>;
1030}
1031
1032impl<W> Write for ContentLenWrite<W>
1033where
1034 W: Write,
1035{
1036 async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
1037 if self.content_len >= self.write_len + buf.len() as u64 {
1038 let write = self.output.write(buf).await.map_err(Error::Io)?;
1039 self.write_len += write as u64;
1040
1041 Ok(write)
1042 } else {
1043 Err(Error::TooLongBody)
1044 }
1045 }
1046
1047 async fn flush(&mut self) -> Result<(), Self::Error> {
1048 self.output.flush().await.map_err(Error::Io)
1049 }
1050}
1051
1052pub(crate) struct ChunkedWrite<W> {
1053 output: W,
1054 finished: bool,
1055}
1056
1057impl<W> ChunkedWrite<W> {
1058 pub const fn new(output: W) -> Self {
1059 Self {
1060 output,
1061 finished: false,
1062 }
1063 }
1064
1065 pub async fn finish(&mut self) -> Result<(), Error<W::Error>>
1066 where
1067 W: Write,
1068 {
1069 if !self.finished {
1070 self.output
1071 .write_all(b"0\r\n\r\n")
1072 .await
1073 .map_err(Error::Io)?;
1074 self.finished = true;
1075 }
1076
1077 Ok(())
1078 }
1079
1080 pub fn release(self) -> W {
1081 self.output
1082 }
1083}
1084
1085impl<W> ErrorType for ChunkedWrite<W>
1086where
1087 W: ErrorType,
1088{
1089 type Error = Error<W::Error>;
1090}
1091
1092impl<W> Write for ChunkedWrite<W>
1093where
1094 W: Write,
1095{
1096 async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
1097 if self.finished {
1098 Err(Error::InvalidState)
1099 } else if !buf.is_empty() {
1100 let mut len_str = heapless::String::<8>::new();
1101 write_unwrap!(&mut len_str, "{:x}", buf.len());
1102
1103 self.output
1104 .write_all(len_str.as_bytes())
1105 .await
1106 .map_err(Error::Io)?;
1107
1108 self.output.write_all(b"\r\n").await.map_err(Error::Io)?;
1109 self.output.write_all(buf).await.map_err(Error::Io)?;
1110 self.output.write_all(b"\r\n").await.map_err(Error::Io)?;
1111
1112 Ok(buf.len())
1113 } else {
1114 Ok(0)
1115 }
1116 }
1117
1118 async fn flush(&mut self) -> Result<(), Self::Error> {
1119 self.output.flush().await.map_err(Error::Io)
1120 }
1121}
1122
1123mod raw {
1124 use core::str;
1125
1126 use embedded_io_async::{Read, Write};
1127
1128 use crate::{BodyType, ConnectionType};
1129
1130 use super::Error;
1131
1132 pub(crate) async fn read_reply_buf<const N: usize, R>(
1133 mut input: R,
1134 buf: &mut [u8],
1135 request: bool,
1136 exact: bool,
1137 ) -> Result<(usize, usize), Error<R::Error>>
1138 where
1139 R: Read,
1140 {
1141 if exact {
1142 let raw_headers_len = read_headers(&mut input, buf).await?;
1143
1144 let mut headers = [httparse::EMPTY_HEADER; N];
1145
1146 let status = if request {
1147 httparse::Request::new(&mut headers).parse(&buf[..raw_headers_len])?
1148 } else {
1149 httparse::Response::new(&mut headers).parse(&buf[..raw_headers_len])?
1150 };
1151
1152 if let httparse::Status::Complete(headers_len) = status {
1153 return Ok((raw_headers_len, headers_len));
1154 }
1155
1156 Err(Error::TooManyHeaders)
1157 } else {
1158 let mut offset = 0;
1159 let mut size = 0;
1160
1161 while buf.len() > size {
1162 let read = input.read(&mut buf[offset..]).await.map_err(Error::Io)?;
1163 if read == 0 {
1164 Err(if offset == 0 {
1165 Error::ConnectionClosed
1166 } else {
1167 Error::IncompleteHeaders
1168 })?;
1169 }
1170
1171 offset += read;
1172 size += read;
1173
1174 let mut headers = [httparse::EMPTY_HEADER; N];
1175
1176 let status = if request {
1177 httparse::Request::new(&mut headers).parse(&buf[..size])?
1178 } else {
1179 httparse::Response::new(&mut headers).parse(&buf[..size])?
1180 };
1181
1182 if let httparse::Status::Complete(headers_len) = status {
1183 return Ok((size, headers_len));
1184 }
1185 }
1186
1187 Err(Error::TooManyHeaders)
1188 }
1189 }
1190
1191 pub(crate) async fn read_headers<R>(
1192 mut input: R,
1193 buf: &mut [u8],
1194 ) -> Result<usize, Error<R::Error>>
1195 where
1196 R: Read,
1197 {
1198 let mut offset = 0;
1199 let mut byte = [0];
1200
1201 loop {
1202 if offset == buf.len() {
1203 Err(Error::TooLongHeaders)?;
1204 }
1205
1206 let read = input.read(&mut byte).await.map_err(Error::Io)?;
1207
1208 if read == 0 {
1209 Err(if offset == 0 {
1210 Error::ConnectionClosed
1211 } else {
1212 Error::IncompleteHeaders
1213 })?;
1214 }
1215
1216 buf[offset] = byte[0];
1217
1218 offset += 1;
1219
1220 if offset >= b"\r\n\r\n".len() && buf[offset - 4..offset] == *b"\r\n\r\n" {
1221 break Ok(offset);
1222 }
1223 }
1224 }
1225
1226 pub(crate) async fn send_version<W>(mut output: W, http11: bool) -> Result<(), Error<W::Error>>
1227 where
1228 W: Write,
1229 {
1230 output
1231 .write_all(if http11 { b"HTTP/1.1" } else { b"HTTP/1.0" })
1232 .await
1233 .map_err(Error::Io)
1234 }
1235
1236 pub(crate) async fn send_headers<'a, H, W>(
1237 headers: H,
1238 mut output: W,
1239 ) -> Result<(Option<ConnectionType>, Option<BodyType>), Error<W::Error>>
1240 where
1241 W: Write,
1242 H: IntoIterator<Item = (&'a str, &'a [u8])>,
1243 {
1244 let mut connection = None;
1245 let mut body = None;
1246
1247 for (name, value) in headers.into_iter() {
1248 let header_connection =
1249 ConnectionType::from_header(name, unsafe { str::from_utf8_unchecked(value) });
1250
1251 if let Some(header_connection) = header_connection {
1252 if let Some(connection) = connection {
1253 warn!(
1254 "Multiple Connection headers found. Current {} and new {}",
1255 connection, header_connection
1256 );
1257 }
1258
1259 connection = Some(header_connection);
1261 }
1262
1263 let header_body =
1264 BodyType::from_header(name, unsafe { str::from_utf8_unchecked(value) });
1265
1266 if let Some(header_body) = header_body {
1267 if let Some(body) = body {
1268 warn!(
1269 "Multiple body type headers found. Current {} and new {}",
1270 body, header_body
1271 );
1272 }
1273
1274 body = Some(header_body);
1276 }
1277
1278 send_header(name, value, &mut output).await?;
1279 }
1280
1281 Ok((connection, body))
1282 }
1283
1284 pub(crate) async fn send_header<W>(
1285 name: &str,
1286 value: &[u8],
1287 mut output: W,
1288 ) -> Result<(), Error<W::Error>>
1289 where
1290 W: Write,
1291 {
1292 output.write_all(name.as_bytes()).await.map_err(Error::Io)?;
1293 output.write_all(b": ").await.map_err(Error::Io)?;
1294 output.write_all(value).await.map_err(Error::Io)?;
1295 output.write_all(b"\r\n").await.map_err(Error::Io)?;
1296
1297 Ok(())
1298 }
1299
1300 pub(crate) async fn send_headers_end<W>(mut output: W) -> Result<(), Error<W::Error>>
1301 where
1302 W: Write,
1303 {
1304 output.write_all(b"\r\n").await.map_err(Error::Io)
1305 }
1306}
1307
1308#[cfg(test)]
1309mod test {
1310 use embedded_io_async::{ErrorType, Read};
1311
1312 use super::*;
1313
1314 struct SliceRead<'a>(&'a [u8]);
1315
1316 impl<'a> ErrorType for SliceRead<'a> {
1317 type Error = core::convert::Infallible;
1318 }
1319
1320 impl<'a> Read for SliceRead<'a> {
1321 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
1322 let len = core::cmp::min(buf.len(), self.0.len());
1323 buf[..len].copy_from_slice(&self.0[..len]);
1324
1325 self.0 = &self.0[len..];
1326
1327 Ok(len)
1328 }
1329 }
1330
1331 #[test]
1332 fn test_chunked_bytes() {
1333 expect(b"A\r\nabcdefghij\r\n2\r\n42\r\n", Some(b"abcdefghij42"));
1335 expect(b"a\r\nabc\r\nfghij\r\n2\r\n42\r\n", Some(b"abc\r\nfghij42"));
1336
1337 expect(b"4\r\nabcd\r\n0\r\n\r\n", Some(b"abcd"));
1339 expect(b"4\r\nabcd\r\n0\r\nA: B\r\n\r\n", Some(b"abcd"));
1340
1341 expect(b"", Some(b""));
1343 expect(b"0\r\n\r\n", Some(b""));
1344
1345 expect(b"h\r\n", None);
1347 expect(b"\r\na", None);
1348 expect(b"4\r\nabcdefg", None);
1349 }
1350
1351 fn expect(input: &[u8], expected: Option<&[u8]>) {
1352 embassy_futures::block_on(async move {
1353 let mut buf1 = [0; 64];
1354 let mut buf2 = [0; 64];
1355
1356 let stream = SliceRead(input);
1357 let mut r = ChunkedRead::new(stream, &mut buf1, 0);
1358
1359 if let Some(expected) = expected {
1360 assert!(r.read_exact(&mut buf2[..expected.len()]).await.is_ok());
1361
1362 assert_eq!(&buf2[..expected.len()], expected);
1363
1364 let len = r.read(&mut buf2).await;
1365 assert!(len.is_ok());
1366
1367 assert_eq!(unwrap!(len), 0);
1368 } else {
1369 assert!(r.read(&mut buf2).await.is_err());
1370 }
1371 })
1372 }
1373}