1use core::cmp::min;
2use core::fmt::{Display, Write as _};
3use core::str;
4
5use embedded_io_async::{ErrorType, Read, Write};
6
7use httparse::Status;
8
9use crate::ws::UpgradeError;
10use crate::{
11 BodyType, ConnectionType, Headers, HeadersMismatchError, Method, RequestHeaders,
12 ResponseHeaders,
13};
14
15pub mod client;
16pub mod server;
17
18#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
20pub enum Error<E> {
21 InvalidHeaders,
22 InvalidBody,
23 TooManyHeaders,
24 TooLongHeaders,
25 TooLongBody,
26 IncompleteHeaders,
27 IncompleteBody,
28 InvalidState,
29 ConnectionClosed,
30 HeadersMismatchError(HeadersMismatchError),
31 WsUpgradeError(UpgradeError),
32 Io(E),
33}
34
35pub type ErrorKind = Error<edge_nal::io::ErrorKind>;
36
37impl<E> Error<E>
38where
39 E: edge_nal::io::Error,
40{
41 pub fn erase(&self) -> Error<edge_nal::io::ErrorKind> {
42 match self {
43 Self::InvalidHeaders => Error::InvalidHeaders,
44 Self::InvalidBody => Error::InvalidBody,
45 Self::TooManyHeaders => Error::TooManyHeaders,
46 Self::TooLongHeaders => Error::TooLongHeaders,
47 Self::TooLongBody => Error::TooLongBody,
48 Self::IncompleteHeaders => Error::IncompleteHeaders,
49 Self::IncompleteBody => Error::IncompleteBody,
50 Self::InvalidState => Error::InvalidState,
51 Self::ConnectionClosed => Error::ConnectionClosed,
52 Self::HeadersMismatchError(e) => Error::HeadersMismatchError(*e),
53 Self::WsUpgradeError(e) => Error::WsUpgradeError(*e),
54 Self::Io(e) => Error::Io(e.kind()),
55 }
56 }
57}
58
59impl<E> From<httparse::Error> for Error<E> {
60 fn from(e: httparse::Error) -> Self {
61 match e {
62 httparse::Error::HeaderName => Self::InvalidHeaders,
63 httparse::Error::HeaderValue => Self::InvalidHeaders,
64 httparse::Error::NewLine => Self::InvalidHeaders,
65 httparse::Error::Status => Self::InvalidHeaders,
66 httparse::Error::Token => Self::InvalidHeaders,
67 httparse::Error::TooManyHeaders => Self::TooManyHeaders,
68 httparse::Error::Version => Self::InvalidHeaders,
69 }
70 }
71}
72
73impl<E> From<HeadersMismatchError> for Error<E> {
74 fn from(e: HeadersMismatchError) -> Self {
75 Self::HeadersMismatchError(e)
76 }
77}
78
79impl<E> From<UpgradeError> for Error<E> {
80 fn from(e: UpgradeError) -> Self {
81 Self::WsUpgradeError(e)
82 }
83}
84
85impl<E> embedded_io_async::Error for Error<E>
86where
87 E: embedded_io_async::Error,
88{
89 fn kind(&self) -> embedded_io_async::ErrorKind {
90 match self {
91 Self::Io(e) => e.kind(),
92 _ => embedded_io_async::ErrorKind::Other,
93 }
94 }
95}
96
97impl<E> Display for Error<E>
98where
99 E: Display,
100{
101 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
102 match self {
103 Self::InvalidHeaders => write!(f, "Invalid HTTP headers or status line"),
104 Self::InvalidBody => write!(f, "Invalid HTTP body"),
105 Self::TooManyHeaders => write!(f, "Too many HTTP headers"),
106 Self::TooLongHeaders => write!(f, "HTTP headers section is too long"),
107 Self::TooLongBody => write!(f, "HTTP body is too long"),
108 Self::IncompleteHeaders => write!(f, "HTTP headers section is incomplete"),
109 Self::IncompleteBody => write!(f, "HTTP body is incomplete"),
110 Self::InvalidState => write!(f, "Connection is not in requested state"),
111 Self::HeadersMismatchError(e) => write!(f, "Headers mismatch: {e}"),
112 Self::WsUpgradeError(e) => write!(f, "WebSocket upgrade error: {e}"),
113 Self::ConnectionClosed => write!(f, "Connection closed"),
114 Self::Io(e) => write!(f, "{e}"),
115 }
116 }
117}
118
119#[cfg(feature = "defmt")]
120impl<E> defmt::Format for Error<E>
121where
122 E: defmt::Format,
123{
124 fn format(&self, f: defmt::Formatter<'_>) {
125 match self {
126 Self::InvalidHeaders => defmt::write!(f, "Invalid HTTP headers or status line"),
127 Self::InvalidBody => defmt::write!(f, "Invalid HTTP body"),
128 Self::TooManyHeaders => defmt::write!(f, "Too many HTTP headers"),
129 Self::TooLongHeaders => defmt::write!(f, "HTTP headers section is too long"),
130 Self::TooLongBody => defmt::write!(f, "HTTP body is too long"),
131 Self::IncompleteHeaders => defmt::write!(f, "HTTP headers section is incomplete"),
132 Self::IncompleteBody => defmt::write!(f, "HTTP body is incomplete"),
133 Self::InvalidState => defmt::write!(f, "Connection is not in requested state"),
134 Self::HeadersMismatchError(e) => defmt::write!(f, "Headers mismatch: {}", e),
135 Self::WsUpgradeError(e) => defmt::write!(f, "WebSocket upgrade error: {}", e),
136 Self::ConnectionClosed => defmt::write!(f, "Connection closed"),
137 Self::Io(e) => defmt::write!(f, "{}", e),
138 }
139 }
140}
141
142impl<E> core::error::Error for Error<E> where E: core::error::Error {}
143
144impl<'b, const N: usize> RequestHeaders<'b, N> {
145 pub async fn receive<R>(
147 &mut self,
148 buf: &'b mut [u8],
149 mut input: R,
150 exact: bool,
151 ) -> Result<(&'b mut [u8], usize), Error<R::Error>>
152 where
153 R: Read,
154 {
155 let (read_len, headers_len) =
156 match raw::read_reply_buf::<N, _>(&mut input, buf, true, exact).await {
157 Ok(read_len) => read_len,
158 Err(e) => return Err(e),
159 };
160
161 let mut parser = httparse::Request::new(&mut self.headers.0);
162
163 let (headers_buf, body_buf) = buf.split_at_mut(headers_len);
164
165 let status = match parser.parse(headers_buf) {
166 Ok(status) => status,
167 Err(e) => return Err(e.into()),
168 };
169
170 if let Status::Complete(headers_len2) = status {
171 if headers_len != headers_len2 {
172 unreachable!("Should not happen. HTTP header parsing is indeterminate.")
173 }
174
175 self.http11 = match parser.version {
176 Some(0) => false,
177 Some(1) => true,
178 _ => Err(Error::InvalidHeaders)?,
179 };
180
181 let method_str = parser.method.ok_or(Error::InvalidHeaders)?;
182 self.method = Method::new(method_str).ok_or(Error::InvalidHeaders)?;
183 self.path = parser.path.ok_or(Error::InvalidHeaders)?;
184
185 trace!("Received:\n{}", self);
186
187 Ok((body_buf, read_len - headers_len))
188 } else {
189 unreachable!("Secondary parse of already loaded buffer failed.")
190 }
191 }
192
193 pub fn resolve<E>(&self) -> Result<(ConnectionType, BodyType), Error<E>> {
195 self.headers.resolve::<E>(None, true, self.http11)
196 }
197
198 pub async fn send<W>(
200 &self,
201 chunked_if_unspecified: bool,
202 mut output: W,
203 ) -> Result<(ConnectionType, BodyType), Error<W::Error>>
204 where
205 W: Write,
206 {
207 send_request(self.http11, self.method, self.path, &mut output).await?;
208
209 self.headers
210 .send(None, true, self.http11, chunked_if_unspecified, output)
211 .await
212 }
213}
214
215impl<'b, const N: usize> ResponseHeaders<'b, N> {
216 pub async fn receive<R>(
218 &mut self,
219 buf: &'b mut [u8],
220 mut input: R,
221 exact: bool,
222 ) -> Result<(&'b mut [u8], usize), Error<R::Error>>
223 where
224 R: Read,
225 {
226 let (read_len, headers_len) =
227 raw::read_reply_buf::<N, _>(&mut input, buf, false, exact).await?;
228
229 let mut parser = httparse::Response::new(&mut self.headers.0);
230
231 let (headers_buf, body_buf) = buf.split_at_mut(headers_len);
232
233 let status = parser.parse(headers_buf).map_err(Error::from)?;
234
235 if let Status::Complete(headers_len2) = status {
236 if headers_len != headers_len2 {
237 unreachable!("Should not happen. HTTP header parsing is indeterminate.")
238 }
239
240 self.http11 = match parser.version {
241 Some(0) => false,
242 Some(1) => true,
243 _ => Err(Error::InvalidHeaders)?,
244 };
245
246 self.code = parser.code.ok_or(Error::InvalidHeaders)?;
247 self.reason = parser.reason;
248
249 trace!("Received:\n{}", self);
250
251 Ok((body_buf, read_len - headers_len))
252 } else {
253 unreachable!("Secondary parse of already loaded buffer failed.")
254 }
255 }
256
257 pub fn resolve<E>(
259 &self,
260 request_connection_type: ConnectionType,
261 ) -> Result<(ConnectionType, BodyType), Error<E>> {
262 self.headers
263 .resolve::<E>(Some(request_connection_type), false, self.http11)
264 }
265
266 pub async fn send<W>(
268 &self,
269 request_connection_type: ConnectionType,
270 chunked_if_unspecified: bool,
271 mut output: W,
272 ) -> Result<(ConnectionType, BodyType), Error<W::Error>>
273 where
274 W: Write,
275 {
276 send_status(self.http11, self.code, self.reason, &mut output).await?;
277
278 self.headers
279 .send(
280 Some(request_connection_type),
281 false,
282 self.http11,
283 chunked_if_unspecified,
284 output,
285 )
286 .await
287 }
288}
289
290pub(crate) async fn send_request<W>(
291 http11: bool,
292 method: Method,
293 path: &str,
294 mut output: W,
295) -> Result<(), Error<W::Error>>
296where
297 W: Write,
298{
299 output
302 .write_all(method.as_str().as_bytes())
303 .await
304 .map_err(Error::Io)?;
305 output.write_all(b" ").await.map_err(Error::Io)?;
306 output.write_all(path.as_bytes()).await.map_err(Error::Io)?;
307 output.write_all(b" ").await.map_err(Error::Io)?;
308 raw::send_version(&mut output, http11).await?;
309 output.write_all(b"\r\n").await.map_err(Error::Io)?;
310
311 Ok(())
312}
313
314pub(crate) async fn send_status<W>(
315 http11: bool,
316 status: u16,
317 reason: Option<&str>,
318 mut output: W,
319) -> Result<(), Error<W::Error>>
320where
321 W: Write,
322{
323 raw::send_version(&mut output, http11).await?;
326 output.write_all(b" ").await.map_err(Error::Io)?;
327 let status_str: heapless::String<5> = unwrap!(status.try_into());
328 output
329 .write_all(status_str.as_bytes())
330 .await
331 .map_err(Error::Io)?;
332 output.write_all(b" ").await.map_err(Error::Io)?;
333 if let Some(reason) = reason {
334 output
335 .write_all(reason.as_bytes())
336 .await
337 .map_err(Error::Io)?;
338 }
339 output.write_all(b"\r\n").await.map_err(Error::Io)?;
340
341 Ok(())
342}
343
344pub(crate) async fn send_headers<'a, H, W>(
345 headers: H,
346 carry_over_connection_type: Option<ConnectionType>,
347 request: bool,
348 http11: bool,
349 chunked_if_unspecified: bool,
350 mut output: W,
351) -> Result<(ConnectionType, BodyType), Error<W::Error>>
352where
353 W: Write,
354 H: IntoIterator<Item = &'a (&'a str, &'a str)>,
355{
356 let (headers_connection_type, headers_body_type) = raw::send_headers(
357 headers
358 .into_iter()
359 .map(|(name, value)| (*name, value.as_bytes())),
360 &mut output,
361 )
362 .await?;
363
364 send_headers_end(
365 headers_connection_type,
366 headers_body_type,
367 carry_over_connection_type,
368 request,
369 http11,
370 chunked_if_unspecified,
371 output,
372 )
373 .await
374}
375
376async fn send_headers_end<W>(
377 headers_connection_type: Option<ConnectionType>,
378 headers_body_type: Option<BodyType>,
379 carry_over_connection_type: Option<ConnectionType>,
380 request: bool,
381 http11: bool,
382 chunked_if_unspecified: bool,
383 mut output: W,
384) -> Result<(ConnectionType, BodyType), Error<W::Error>>
385where
386 W: Write,
387{
388 let connection_type =
389 ConnectionType::resolve(headers_connection_type, carry_over_connection_type, http11)?;
390
391 let body_type = BodyType::resolve(
392 headers_body_type,
393 connection_type,
394 request,
395 http11,
396 chunked_if_unspecified,
397 )?;
398
399 if headers_connection_type.is_none() {
400 let (name, value) = connection_type.raw_header();
402
403 raw::send_header(name, value, &mut output).await?;
404 }
405
406 if headers_body_type.is_none() {
407 let mut buf = heapless::String::new();
408
409 if let Some((name, value)) = body_type.raw_header(&mut buf) {
410 raw::send_header(name, value, &mut output).await?;
412 }
413 }
414
415 raw::send_headers_end(output).await?;
416
417 Ok((connection_type, body_type))
418}
419
420impl<const N: usize> Headers<'_, N> {
421 fn resolve<E>(
422 &self,
423 carry_over_connection_type: Option<ConnectionType>,
424 request: bool,
425 http11: bool,
426 ) -> Result<(ConnectionType, BodyType), Error<E>> {
427 let headers_connection_type = ConnectionType::from_headers(self.iter());
428 let headers_body_type = BodyType::from_headers(self.iter());
429
430 let connection_type =
431 ConnectionType::resolve(headers_connection_type, carry_over_connection_type, http11)?;
432 let body_type =
433 BodyType::resolve(headers_body_type, connection_type, request, http11, false)?;
434
435 Ok((connection_type, body_type))
436 }
437
438 async fn send<W>(
439 &self,
440 carry_over_connection_type: Option<ConnectionType>,
441 request: bool,
442 http11: bool,
443 chunked_if_unspecified: bool,
444 mut output: W,
445 ) -> Result<(ConnectionType, BodyType), Error<W::Error>>
446 where
447 W: Write,
448 {
449 let (headers_connection_type, headers_body_type) =
450 raw::send_headers(self.iter_raw(), &mut output).await?;
451
452 send_headers_end(
453 headers_connection_type,
454 headers_body_type,
455 carry_over_connection_type,
456 request,
457 http11,
458 chunked_if_unspecified,
459 output,
460 )
461 .await
462 }
463}
464
465#[allow(private_interfaces)]
469pub enum Body<'b, R> {
470 Raw(PartiallyRead<'b, R>),
472 ContentLen(ContentLenRead<PartiallyRead<'b, R>>),
474 Chunked(ChunkedRead<'b, PartiallyRead<'b, R>>),
476}
477
478impl<'b, R> Body<'b, R>
479where
480 R: Read,
481{
482 pub fn new(body_type: BodyType, buf: &'b mut [u8], read_len: usize, input: R) -> Self {
490 match body_type {
491 BodyType::Chunked => Body::Chunked(ChunkedRead::new(
492 PartiallyRead::new(&[], input),
493 buf,
494 read_len,
495 )),
496 BodyType::ContentLen(content_len) => Body::ContentLen(ContentLenRead::new(
497 content_len,
498 PartiallyRead::new(&buf[..read_len], input),
499 )),
500 BodyType::Raw => Body::Raw(PartiallyRead::new(&buf[..read_len], input)),
501 }
502 }
503
504 pub fn needs_close(&self) -> bool {
506 !self.is_complete() || matches!(self, Self::Raw(_))
507 }
508
509 pub fn is_complete(&self) -> bool {
511 match self {
512 Self::Raw(_) => true,
513 Self::ContentLen(r) => r.is_complete(),
514 Self::Chunked(r) => r.is_complete(),
515 }
516 }
517
518 pub fn as_raw_reader(&mut self) -> &mut R {
520 match self {
521 Self::Raw(r) => &mut r.input,
522 Self::ContentLen(r) => &mut r.input.input,
523 Self::Chunked(r) => &mut r.input.input,
524 }
525 }
526
527 pub fn release(self) -> R {
529 match self {
530 Self::Raw(r) => r.release(),
531 Self::ContentLen(r) => r.release().release(),
532 Self::Chunked(r) => r.release().release(),
533 }
534 }
535}
536
537impl<R> ErrorType for Body<'_, R>
538where
539 R: ErrorType,
540{
541 type Error = Error<R::Error>;
542}
543
544impl<R> Read for Body<'_, R>
545where
546 R: Read,
547{
548 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
549 match self {
550 Self::Raw(read) => Ok(read.read(buf).await.map_err(Error::Io)?),
551 Self::ContentLen(read) => Ok(read.read(buf).await?),
552 Self::Chunked(read) => Ok(read.read(buf).await?),
553 }
554 }
555}
556
557pub(crate) struct PartiallyRead<'b, R> {
558 buf: &'b [u8],
559 read_len: usize,
560 input: R,
561}
562
563impl<'b, R> PartiallyRead<'b, R> {
564 pub const fn new(buf: &'b [u8], input: R) -> Self {
565 Self {
566 buf,
567 read_len: 0,
568 input,
569 }
570 }
571
572 pub fn release(self) -> R {
581 self.input
582 }
583}
584
585impl<R> ErrorType for PartiallyRead<'_, R>
586where
587 R: ErrorType,
588{
589 type Error = R::Error;
590}
591
592impl<R> Read for PartiallyRead<'_, R>
593where
594 R: Read,
595{
596 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
597 if self.buf.len() > self.read_len {
598 let len = min(buf.len(), self.buf.len() - self.read_len);
599 buf[..len].copy_from_slice(&self.buf[self.read_len..self.read_len + len]);
600
601 self.read_len += len;
602
603 Ok(len)
604 } else {
605 Ok(self.input.read(buf).await?)
606 }
607 }
608}
609
610pub(crate) struct ContentLenRead<R> {
611 content_len: u64,
612 read_len: u64,
613 input: R,
614}
615
616impl<R> ContentLenRead<R> {
617 pub const fn new(content_len: u64, input: R) -> Self {
618 Self {
619 content_len,
620 read_len: 0,
621 input,
622 }
623 }
624
625 pub fn is_complete(&self) -> bool {
626 self.content_len == self.read_len
627 }
628
629 pub fn release(self) -> R {
630 self.input
631 }
632}
633
634impl<R> ErrorType for ContentLenRead<R>
635where
636 R: ErrorType,
637{
638 type Error = Error<R::Error>;
639}
640
641impl<R> Read for ContentLenRead<R>
642where
643 R: Read,
644{
645 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
646 let len = min(buf.len() as _, self.content_len - self.read_len);
647 if len > 0 {
648 let read = self
649 .input
650 .read(&mut buf[..len as _])
651 .await
652 .map_err(Error::Io)?;
653 self.read_len += read as u64;
654
655 Ok(read)
656 } else {
657 Ok(0)
658 }
659 }
660}
661
662pub(crate) struct ChunkedRead<'b, R> {
663 buf: &'b mut [u8],
664 buf_offset: usize,
665 buf_len: usize,
666 input: R,
667 remain: u64,
668 complete: bool,
669}
670
671impl<'b, R> ChunkedRead<'b, R>
672where
673 R: Read,
674{
675 pub fn new(input: R, buf: &'b mut [u8], buf_len: usize) -> Self {
676 Self {
677 buf,
678 buf_offset: 0,
679 buf_len,
680 input,
681 remain: 0,
682 complete: false,
683 }
684 }
685
686 pub fn is_complete(&self) -> bool {
687 self.complete
688 }
689
690 pub fn release(self) -> R {
691 self.input
692 }
693
694 async fn next(&mut self) -> Result<Option<u8>, Error<R::Error>> {
702 if self.complete {
703 return Ok(None);
704 }
705
706 if self.remain == 0 {
707 if let Some(size) = self.parse_size().await? {
708 if size == 0 {
710 self.consume_trailer().await?;
711 self.complete = true;
712 return Ok(None);
713 }
714
715 self.remain = size;
716 } else {
717 self.complete = true;
718 return Ok(None);
719 }
720 }
721
722 let next = self.input_fetch().await?;
723 self.remain -= 1;
724
725 if self.remain == 0 {
727 self.consume_multi(b"\r\n").await?;
728 }
729
730 Ok(Some(next))
731 }
732
733 async fn parse_size(&mut self) -> Result<Option<u64>, Error<R::Error>> {
735 let mut digits = [0_u8; 16];
736
737 let slice = match self.parse_digits(&mut digits[..]).await? {
738 Some(s) => str::from_utf8(s).map_err(|_| Error::InvalidBody)?,
739 None => return Ok(None),
740 };
741
742 let size = u64::from_str_radix(slice, 16).map_err(|_| Error::InvalidBody)?;
743
744 Ok(Some(size))
745 }
746
747 async fn parse_digits<'a>(
749 &'a mut self,
750 digits: &'a mut [u8],
751 ) -> Result<Option<&'a [u8]>, Error<R::Error>> {
752 let mut len = 0;
754
755 loop {
756 let b = match self.input_next().await? {
757 Some(b) => b,
758 None => {
759 return if len == 0 {
760 Ok(None)
762 } else {
763 Err(Error::IncompleteBody)
764 };
765 }
766 };
767
768 match b {
769 b'\r' => {
770 self.consume(b'\n').await?;
771 break;
772 }
773 b';' => {
774 self.consume_ext().await?;
775 break;
776 }
777 _ => {
778 match digits.get_mut(len) {
779 Some(d) => *d = b,
780 None => return Err(Error::InvalidBody),
781 }
782
783 len += 1;
784 }
785 }
786 }
787
788 Ok(Some(&digits[..len]))
789 }
790
791 async fn consume_ext(&mut self) -> Result<(), Error<R::Error>> {
794 self.consume_header().await?;
795
796 Ok(())
797 }
798
799 async fn consume_trailer(&mut self) -> Result<(), Error<R::Error>> {
801 while self.consume_header().await? {}
802
803 Ok(())
804 }
805
806 async fn consume_header(&mut self) -> Result<bool, Error<R::Error>> {
808 let mut first = self.input_fetch().await?;
809 let mut len = 1;
810
811 loop {
812 let second = self.input_fetch().await?;
813 len += 1;
814
815 if first == b'\r' && second == b'\n' {
816 return Ok(len > 2);
817 }
818
819 first = second;
820 }
821 }
822
823 async fn consume_multi(&mut self, bytes: &[u8]) -> Result<(), Error<R::Error>> {
825 for byte in bytes {
826 self.consume(*byte).await?;
827 }
828
829 Ok(())
830 }
831
832 async fn consume(&mut self, byte: u8) -> Result<(), Error<R::Error>> {
834 if self.input_fetch().await? == byte {
835 Ok(())
836 } else {
837 Err(Error::InvalidBody)
838 }
839 }
840
841 async fn input_fetch(&mut self) -> Result<u8, Error<R::Error>> {
842 self.input_next().await?.ok_or(Error::IncompleteBody)
843 }
844
845 async fn input_next(&mut self) -> Result<Option<u8>, Error<R::Error>> {
846 if self.buf_offset == self.buf_len {
847 self.buf_len = self.input.read(self.buf).await.map_err(Error::Io)?;
848 self.buf_offset = 0;
849 }
850
851 if self.buf_len > 0 {
852 let byte = self.buf[self.buf_offset];
853 self.buf_offset += 1;
854
855 Ok(Some(byte))
856 } else {
857 Ok(None)
858 }
859 }
860}
861
862impl<R> ErrorType for ChunkedRead<'_, R>
863where
864 R: ErrorType,
865{
866 type Error = Error<R::Error>;
867}
868
869impl<R> Read for ChunkedRead<'_, R>
870where
871 R: Read,
872{
873 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
874 for (index, byte_pos) in buf.iter_mut().enumerate() {
875 if let Some(byte) = self.next().await? {
876 *byte_pos = byte;
877 } else {
878 return Ok(index);
879 }
880 }
881
882 Ok(buf.len())
883 }
884}
885
886#[allow(private_interfaces)]
890pub enum SendBody<W> {
891 Raw(W),
893 ContentLen(ContentLenWrite<W>),
895 Chunked(ChunkedWrite<W>),
897}
898
899impl<W> SendBody<W>
900where
901 W: Write,
902{
903 pub fn new(body_type: BodyType, output: W) -> SendBody<W> {
909 match body_type {
910 BodyType::Chunked => SendBody::Chunked(ChunkedWrite::new(output)),
911 BodyType::ContentLen(content_len) => {
912 SendBody::ContentLen(ContentLenWrite::new(content_len, output))
913 }
914 BodyType::Raw => SendBody::Raw(output),
915 }
916 }
917
918 pub fn is_complete(&self) -> bool {
920 match self {
921 Self::ContentLen(w) => w.is_complete(),
922 _ => true,
923 }
924 }
925
926 pub fn needs_close(&self) -> bool {
928 !self.is_complete() || matches!(self, Self::Raw(_))
929 }
930
931 pub async fn finish(&mut self) -> Result<(), Error<W::Error>>
933 where
934 W: Write,
935 {
936 match self {
937 Self::Raw(_) => (),
938 Self::ContentLen(w) => {
939 if !w.is_complete() {
940 return Err(Error::IncompleteBody);
941 }
942 }
943 Self::Chunked(w) => w.finish().await?,
944 }
945
946 self.flush().await?;
947
948 Ok(())
949 }
950
951 pub fn as_raw_writer(&mut self) -> &mut W {
953 match self {
954 Self::Raw(w) => w,
955 Self::ContentLen(w) => &mut w.output,
956 Self::Chunked(w) => &mut w.output,
957 }
958 }
959
960 pub fn release(self) -> W {
962 match self {
963 Self::Raw(w) => w,
964 Self::ContentLen(w) => w.release(),
965 Self::Chunked(w) => w.release(),
966 }
967 }
968}
969
970impl<W> ErrorType for SendBody<W>
971where
972 W: ErrorType,
973{
974 type Error = Error<W::Error>;
975}
976
977impl<W> Write for SendBody<W>
978where
979 W: Write,
980{
981 async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
982 match self {
983 Self::Raw(w) => Ok(w.write(buf).await.map_err(Error::Io)?),
984 Self::ContentLen(w) => Ok(w.write(buf).await?),
985 Self::Chunked(w) => Ok(w.write(buf).await?),
986 }
987 }
988
989 async fn flush(&mut self) -> Result<(), Self::Error> {
990 match self {
991 Self::Raw(w) => Ok(w.flush().await.map_err(Error::Io)?),
992 Self::ContentLen(w) => Ok(w.flush().await?),
993 Self::Chunked(w) => Ok(w.flush().await?),
994 }
995 }
996}
997
998pub(crate) struct ContentLenWrite<W> {
999 content_len: u64,
1000 write_len: u64,
1001 output: W,
1002}
1003
1004impl<W> ContentLenWrite<W> {
1005 pub const fn new(content_len: u64, output: W) -> Self {
1006 Self {
1007 content_len,
1008 write_len: 0,
1009 output,
1010 }
1011 }
1012
1013 pub fn is_complete(&self) -> bool {
1014 self.content_len == self.write_len
1015 }
1016
1017 pub fn release(self) -> W {
1018 self.output
1019 }
1020}
1021
1022impl<W> ErrorType for ContentLenWrite<W>
1023where
1024 W: ErrorType,
1025{
1026 type Error = Error<W::Error>;
1027}
1028
1029impl<W> Write for ContentLenWrite<W>
1030where
1031 W: Write,
1032{
1033 async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
1034 if self.content_len >= self.write_len + buf.len() as u64 {
1035 let write = self.output.write(buf).await.map_err(Error::Io)?;
1036 self.write_len += write as u64;
1037
1038 Ok(write)
1039 } else {
1040 Err(Error::TooLongBody)
1041 }
1042 }
1043
1044 async fn flush(&mut self) -> Result<(), Self::Error> {
1045 self.output.flush().await.map_err(Error::Io)
1046 }
1047}
1048
1049pub(crate) struct ChunkedWrite<W> {
1050 output: W,
1051 finished: bool,
1052}
1053
1054impl<W> ChunkedWrite<W> {
1055 pub const fn new(output: W) -> Self {
1056 Self {
1057 output,
1058 finished: false,
1059 }
1060 }
1061
1062 pub async fn finish(&mut self) -> Result<(), Error<W::Error>>
1063 where
1064 W: Write,
1065 {
1066 if !self.finished {
1067 self.output
1068 .write_all(b"0\r\n\r\n")
1069 .await
1070 .map_err(Error::Io)?;
1071 self.finished = true;
1072 }
1073
1074 Ok(())
1075 }
1076
1077 pub fn release(self) -> W {
1078 self.output
1079 }
1080}
1081
1082impl<W> ErrorType for ChunkedWrite<W>
1083where
1084 W: ErrorType,
1085{
1086 type Error = Error<W::Error>;
1087}
1088
1089impl<W> Write for ChunkedWrite<W>
1090where
1091 W: Write,
1092{
1093 async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
1094 if self.finished {
1095 Err(Error::InvalidState)
1096 } else if !buf.is_empty() {
1097 let mut len_str = heapless::String::<8>::new();
1098 write_unwrap!(&mut len_str, "{:x}", buf.len());
1099
1100 self.output
1101 .write_all(len_str.as_bytes())
1102 .await
1103 .map_err(Error::Io)?;
1104
1105 self.output.write_all(b"\r\n").await.map_err(Error::Io)?;
1106 self.output.write_all(buf).await.map_err(Error::Io)?;
1107 self.output.write_all(b"\r\n").await.map_err(Error::Io)?;
1108
1109 Ok(buf.len())
1110 } else {
1111 Ok(0)
1112 }
1113 }
1114
1115 async fn flush(&mut self) -> Result<(), Self::Error> {
1116 self.output.flush().await.map_err(Error::Io)
1117 }
1118}
1119
1120mod raw {
1121 use core::str;
1122
1123 use embedded_io_async::{Read, Write};
1124
1125 use crate::{BodyType, ConnectionType};
1126
1127 use super::Error;
1128
1129 pub(crate) async fn read_reply_buf<const N: usize, R>(
1130 mut input: R,
1131 buf: &mut [u8],
1132 request: bool,
1133 exact: bool,
1134 ) -> Result<(usize, usize), Error<R::Error>>
1135 where
1136 R: Read,
1137 {
1138 if exact {
1139 let raw_headers_len = read_headers(&mut input, buf).await?;
1140
1141 let mut headers = [httparse::EMPTY_HEADER; N];
1142
1143 let status = if request {
1144 httparse::Request::new(&mut headers).parse(&buf[..raw_headers_len])?
1145 } else {
1146 httparse::Response::new(&mut headers).parse(&buf[..raw_headers_len])?
1147 };
1148
1149 if let httparse::Status::Complete(headers_len) = status {
1150 return Ok((raw_headers_len, headers_len));
1151 }
1152
1153 Err(Error::TooManyHeaders)
1154 } else {
1155 let mut offset = 0;
1156 let mut size = 0;
1157
1158 while buf.len() > size {
1159 let read = input.read(&mut buf[offset..]).await.map_err(Error::Io)?;
1160 if read == 0 {
1161 Err(if offset == 0 {
1162 Error::ConnectionClosed
1163 } else {
1164 Error::IncompleteHeaders
1165 })?;
1166 }
1167
1168 offset += read;
1169 size += read;
1170
1171 let mut headers = [httparse::EMPTY_HEADER; N];
1172
1173 let status = if request {
1174 httparse::Request::new(&mut headers).parse(&buf[..size])?
1175 } else {
1176 httparse::Response::new(&mut headers).parse(&buf[..size])?
1177 };
1178
1179 if let httparse::Status::Complete(headers_len) = status {
1180 return Ok((size, headers_len));
1181 }
1182 }
1183
1184 Err(Error::TooManyHeaders)
1185 }
1186 }
1187
1188 pub(crate) async fn read_headers<R>(
1189 mut input: R,
1190 buf: &mut [u8],
1191 ) -> Result<usize, Error<R::Error>>
1192 where
1193 R: Read,
1194 {
1195 let mut offset = 0;
1196 let mut byte = [0];
1197
1198 loop {
1199 if offset == buf.len() {
1200 Err(Error::TooLongHeaders)?;
1201 }
1202
1203 let read = input.read(&mut byte).await.map_err(Error::Io)?;
1204
1205 if read == 0 {
1206 Err(if offset == 0 {
1207 Error::ConnectionClosed
1208 } else {
1209 Error::IncompleteHeaders
1210 })?;
1211 }
1212
1213 buf[offset] = byte[0];
1214
1215 offset += 1;
1216
1217 if offset >= b"\r\n\r\n".len() && buf[offset - 4..offset] == *b"\r\n\r\n" {
1218 break Ok(offset);
1219 }
1220 }
1221 }
1222
1223 pub(crate) async fn send_version<W>(mut output: W, http11: bool) -> Result<(), Error<W::Error>>
1224 where
1225 W: Write,
1226 {
1227 output
1228 .write_all(if http11 { b"HTTP/1.1" } else { b"HTTP/1.0" })
1229 .await
1230 .map_err(Error::Io)
1231 }
1232
1233 pub(crate) async fn send_headers<'a, H, W>(
1234 headers: H,
1235 mut output: W,
1236 ) -> Result<(Option<ConnectionType>, Option<BodyType>), Error<W::Error>>
1237 where
1238 W: Write,
1239 H: IntoIterator<Item = (&'a str, &'a [u8])>,
1240 {
1241 let mut connection = None;
1242 let mut body = None;
1243
1244 for (name, value) in headers.into_iter() {
1245 let header_connection =
1246 ConnectionType::from_header(name, str::from_utf8(value).unwrap_or(""));
1247
1248 if let Some(header_connection) = header_connection {
1249 if let Some(connection) = connection {
1250 warn!(
1251 "Multiple Connection headers found. Current {} and new {}",
1252 connection, header_connection
1253 );
1254 }
1255
1256 connection = Some(header_connection);
1258 }
1259
1260 let header_body = BodyType::from_header(name, str::from_utf8(value).unwrap_or(""));
1261
1262 if let Some(header_body) = header_body {
1263 if let Some(body) = body {
1264 warn!(
1265 "Multiple body type headers found. Current {} and new {}",
1266 body, header_body
1267 );
1268 }
1269
1270 body = Some(header_body);
1272 }
1273
1274 send_header(name, value, &mut output).await?;
1275 }
1276
1277 Ok((connection, body))
1278 }
1279
1280 pub(crate) async fn send_header<W>(
1281 name: &str,
1282 value: &[u8],
1283 mut output: W,
1284 ) -> Result<(), Error<W::Error>>
1285 where
1286 W: Write,
1287 {
1288 output.write_all(name.as_bytes()).await.map_err(Error::Io)?;
1289 output.write_all(b": ").await.map_err(Error::Io)?;
1290 output.write_all(value).await.map_err(Error::Io)?;
1291 output.write_all(b"\r\n").await.map_err(Error::Io)?;
1292
1293 Ok(())
1294 }
1295
1296 pub(crate) async fn send_headers_end<W>(mut output: W) -> Result<(), Error<W::Error>>
1297 where
1298 W: Write,
1299 {
1300 output.write_all(b"\r\n").await.map_err(Error::Io)
1301 }
1302}
1303
1304#[cfg(test)]
1305mod test {
1306 use embedded_io_async::{ErrorType, Read};
1307
1308 use super::*;
1309
1310 struct SliceRead<'a>(&'a [u8]);
1311
1312 impl<'a> ErrorType for SliceRead<'a> {
1313 type Error = core::convert::Infallible;
1314 }
1315
1316 impl<'a> Read for SliceRead<'a> {
1317 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
1318 let len = core::cmp::min(buf.len(), self.0.len());
1319 buf[..len].copy_from_slice(&self.0[..len]);
1320
1321 self.0 = &self.0[len..];
1322
1323 Ok(len)
1324 }
1325 }
1326
1327 #[test]
1328 fn test_chunked_bytes() {
1329 expect(b"A\r\nabcdefghij\r\n2\r\n42\r\n", Some(b"abcdefghij42"));
1331 expect(b"a\r\nabc\r\nfghij\r\n2\r\n42\r\n", Some(b"abc\r\nfghij42"));
1332
1333 expect(b"4\r\nabcd\r\n0\r\n\r\n", Some(b"abcd"));
1335 expect(b"4\r\nabcd\r\n0\r\nA: B\r\n\r\n", Some(b"abcd"));
1336
1337 expect(b"", Some(b""));
1339 expect(b"0\r\n\r\n", Some(b""));
1340
1341 expect(b"h\r\n", None);
1343 expect(b"\r\na", None);
1344 expect(b"4\r\nabcdefg", None);
1345 }
1346
1347 fn expect(input: &[u8], expected: Option<&[u8]>) {
1348 embassy_futures::block_on(async move {
1349 let mut buf1 = [0; 64];
1350 let mut buf2 = [0; 64];
1351
1352 let stream = SliceRead(input);
1353 let mut r = ChunkedRead::new(stream, &mut buf1, 0);
1354
1355 if let Some(expected) = expected {
1356 assert!(r.read_exact(&mut buf2[..expected.len()]).await.is_ok());
1357
1358 assert_eq!(&buf2[..expected.len()], expected);
1359
1360 let len = r.read(&mut buf2).await;
1361 assert!(len.is_ok());
1362
1363 assert_eq!(unwrap!(len), 0);
1364 } else {
1365 assert!(r.read(&mut buf2).await.is_err());
1366 }
1367 })
1368 }
1369}