1use crate::parser::{BodyLength, ParseError};
31
32pub const DEFAULT_MAX_BODY_SIZE: usize = 1024 * 1024;
34
35#[derive(Debug, Clone)]
37pub struct BodyConfig {
38 max_size: usize,
40 initial_capacity: usize,
42}
43
44impl Default for BodyConfig {
45 fn default() -> Self {
46 Self {
47 max_size: DEFAULT_MAX_BODY_SIZE,
48 initial_capacity: 4096,
49 }
50 }
51}
52
53impl BodyConfig {
54 #[must_use]
56 pub fn new() -> Self {
57 Self::default()
58 }
59
60 #[must_use]
62 pub fn with_max_size(mut self, size: usize) -> Self {
63 self.max_size = size;
64 self
65 }
66
67 #[must_use]
69 pub fn with_initial_capacity(mut self, capacity: usize) -> Self {
70 self.initial_capacity = capacity;
71 self
72 }
73
74 #[must_use]
76 pub fn max_size(&self) -> usize {
77 self.max_size
78 }
79
80 #[must_use]
82 pub fn initial_capacity(&self) -> usize {
83 self.initial_capacity
84 }
85}
86
87#[derive(Debug)]
89pub enum BodyError {
90 TooLarge {
92 size: usize,
94 max: usize,
96 },
97 InvalidChunkedEncoding {
99 detail: &'static str,
101 },
102 Incomplete {
104 received: usize,
106 expected: Option<usize>,
108 },
109 UnexpectedEof,
111 Parse(ParseError),
113}
114
115impl std::fmt::Display for BodyError {
116 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117 match self {
118 Self::TooLarge { size, max } => {
119 write!(f, "body too large: {size} bytes exceeds limit of {max}")
120 }
121 Self::InvalidChunkedEncoding { detail } => {
122 write!(f, "invalid chunked encoding: {detail}")
123 }
124 Self::Incomplete { received, expected } => {
125 if let Some(exp) = expected {
126 write!(f, "incomplete body: received {received} of {exp} bytes")
127 } else {
128 write!(f, "incomplete body: received {received} bytes")
129 }
130 }
131 Self::UnexpectedEof => write!(f, "unexpected end of body"),
132 Self::Parse(e) => write!(f, "parse error: {e}"),
133 }
134 }
135}
136
137impl std::error::Error for BodyError {
138 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
139 match self {
140 Self::Parse(e) => Some(e),
141 _ => None,
142 }
143 }
144}
145
146impl From<ParseError> for BodyError {
147 fn from(e: ParseError) -> Self {
148 Self::Parse(e)
149 }
150}
151
152#[derive(Debug)]
161pub struct ContentLengthReader<'a> {
162 buffer: &'a [u8],
163 length: usize,
164 position: usize,
165 #[allow(dead_code)]
167 config: BodyConfig,
168}
169
170impl<'a> ContentLengthReader<'a> {
171 pub fn new(buffer: &'a [u8], length: usize, config: &BodyConfig) -> Result<Self, BodyError> {
183 if length > config.max_size {
185 return Err(BodyError::TooLarge {
186 size: length,
187 max: config.max_size,
188 });
189 }
190
191 Ok(Self {
192 buffer,
193 length,
194 position: 0,
195 config: config.clone(),
196 })
197 }
198
199 #[must_use]
201 pub fn length(&self) -> usize {
202 self.length
203 }
204
205 #[must_use]
207 pub fn remaining(&self) -> usize {
208 self.length.saturating_sub(self.position)
209 }
210
211 #[must_use]
213 pub fn is_complete(&self) -> bool {
214 self.position >= self.length
215 }
216
217 pub fn read(&mut self, dest: &mut [u8]) -> Result<usize, BodyError> {
221 if self.is_complete() {
222 return Ok(0);
223 }
224
225 let available = self.buffer.len().saturating_sub(self.position);
226 let to_read = dest.len().min(self.remaining()).min(available);
227
228 if to_read == 0 && !self.is_complete() {
229 return Err(BodyError::Incomplete {
230 received: self.position,
231 expected: Some(self.length),
232 });
233 }
234
235 dest[..to_read].copy_from_slice(&self.buffer[self.position..self.position + to_read]);
236 self.position += to_read;
237
238 Ok(to_read)
239 }
240
241 pub fn read_all(&mut self) -> Result<Vec<u8>, BodyError> {
247 if self.buffer.len() < self.length {
248 return Err(BodyError::Incomplete {
249 received: self.buffer.len(),
250 expected: Some(self.length),
251 });
252 }
253
254 let body = self.buffer[..self.length].to_vec();
255 self.position = self.length;
256 Ok(body)
257 }
258
259 pub fn read_all_borrowed(&self) -> Result<&'a [u8], BodyError> {
267 if self.buffer.len() < self.length {
268 return Err(BodyError::Incomplete {
269 received: self.buffer.len(),
270 expected: Some(self.length),
271 });
272 }
273
274 Ok(&self.buffer[..self.length])
275 }
276}
277
278#[derive(Debug, Clone, Copy, PartialEq, Eq)]
284enum ChunkedState {
285 ChunkSize,
287 ChunkData { remaining: usize },
289 ChunkDataEnd,
291 Trailers,
293 Complete,
295}
296
297#[derive(Debug)]
308pub struct ChunkedReader<'a> {
309 buffer: &'a [u8],
310 position: usize,
311 state: ChunkedState,
312 total_size: usize,
313 config: BodyConfig,
314}
315
316impl<'a> ChunkedReader<'a> {
317 #[must_use]
324 pub fn new(buffer: &'a [u8], config: &BodyConfig) -> Self {
325 Self {
326 buffer,
327 position: 0,
328 state: ChunkedState::ChunkSize,
329 total_size: 0,
330 config: config.clone(),
331 }
332 }
333
334 #[must_use]
336 pub fn is_complete(&self) -> bool {
337 self.state == ChunkedState::Complete
338 }
339
340 #[must_use]
342 pub fn total_size(&self) -> usize {
343 self.total_size
344 }
345
346 fn parse_chunk_size(&self) -> Result<(usize, usize), BodyError> {
348 let remaining = &self.buffer[self.position..];
349
350 let line_end =
352 remaining
353 .windows(2)
354 .position(|w| w == b"\r\n")
355 .ok_or(BodyError::Incomplete {
356 received: self.position,
357 expected: None,
358 })?;
359
360 let size_line = &remaining[..line_end];
361
362 let size_str = if let Some(semi) = size_line.iter().position(|&b| b == b';') {
364 &size_line[..semi]
365 } else {
366 size_line
367 };
368
369 let size_str =
370 std::str::from_utf8(size_str).map_err(|_| BodyError::InvalidChunkedEncoding {
371 detail: "invalid UTF-8 in chunk size",
372 })?;
373
374 let size = usize::from_str_radix(size_str.trim(), 16).map_err(|_| {
375 BodyError::InvalidChunkedEncoding {
376 detail: "invalid hex chunk size",
377 }
378 })?;
379
380 const MAX_SINGLE_CHUNK: usize = 16 * 1024 * 1024;
383 if size > MAX_SINGLE_CHUNK {
384 return Err(BodyError::InvalidChunkedEncoding {
385 detail: "chunk size exceeds 16MB limit",
386 });
387 }
388
389 Ok((size, line_end + 2))
391 }
392
393 pub fn decode_all(&mut self) -> Result<Vec<u8>, BodyError> {
402 let mut output = Vec::with_capacity(self.config.initial_capacity);
403
404 loop {
405 match self.state {
406 ChunkedState::ChunkSize => {
407 let (size, consumed) = self.parse_chunk_size()?;
408 self.position += consumed;
409
410 let new_total = self.total_size.saturating_add(size);
412 if new_total > self.config.max_size {
413 return Err(BodyError::TooLarge {
414 size: new_total,
415 max: self.config.max_size,
416 });
417 }
418
419 if size == 0 {
420 self.state = ChunkedState::Trailers;
422 } else {
423 self.state = ChunkedState::ChunkData { remaining: size };
424 }
425 }
426 ChunkedState::ChunkData { remaining } => {
427 let available = self.buffer.len().saturating_sub(self.position);
428 if available < remaining {
429 return Err(BodyError::Incomplete {
430 received: self.total_size + (remaining - available),
431 expected: None,
432 });
433 }
434
435 let chunk_data = &self.buffer[self.position..self.position + remaining];
437 output.extend_from_slice(chunk_data);
438 self.position += remaining;
439 self.total_size += remaining;
440
441 self.state = ChunkedState::ChunkDataEnd;
442 }
443 ChunkedState::ChunkDataEnd => {
444 let remaining = &self.buffer[self.position..];
446 if remaining.len() < 2 {
447 return Err(BodyError::Incomplete {
448 received: self.total_size,
449 expected: None,
450 });
451 }
452
453 if &remaining[..2] != b"\r\n" {
454 return Err(BodyError::InvalidChunkedEncoding {
455 detail: "expected CRLF after chunk data",
456 });
457 }
458
459 self.position += 2;
460 self.state = ChunkedState::ChunkSize;
461 }
462 ChunkedState::Trailers => {
463 let remaining = &self.buffer[self.position..];
465
466 if remaining.starts_with(b"\r\n") {
468 self.position += 2;
469 self.state = ChunkedState::Complete;
470 } else {
471 let line_end = remaining.windows(2).position(|w| w == b"\r\n");
473 match line_end {
474 Some(pos) => {
475 self.position += pos + 2;
477 }
479 None => {
480 return Err(BodyError::Incomplete {
481 received: self.total_size,
482 expected: None,
483 });
484 }
485 }
486 }
487 }
488 ChunkedState::Complete => {
489 break;
490 }
491 }
492 }
493
494 Ok(output)
495 }
496
497 #[must_use]
499 pub fn bytes_consumed(&self) -> usize {
500 self.position
501 }
502}
503
504pub fn parse_body(
530 buffer: &[u8],
531 body_length: BodyLength,
532 config: &BodyConfig,
533) -> Result<Option<Vec<u8>>, BodyError> {
534 let (body, _) = parse_body_with_consumed(buffer, body_length, config)?;
535 Ok(body)
536}
537
538pub fn parse_body_with_consumed(
549 buffer: &[u8],
550 body_length: BodyLength,
551 config: &BodyConfig,
552) -> Result<(Option<Vec<u8>>, usize), BodyError> {
553 match body_length {
554 BodyLength::None => Ok((None, 0)),
555 BodyLength::ContentLength(len) => {
556 if len == 0 {
557 return Ok((Some(Vec::new()), 0));
558 }
559 let mut reader = ContentLengthReader::new(buffer, len, config)?;
560 let body = reader.read_all()?;
561 Ok((Some(body), len))
562 }
563 BodyLength::Chunked => {
564 let mut reader = ChunkedReader::new(buffer, config);
565 let body = reader.decode_all()?;
566 Ok((Some(body), reader.bytes_consumed()))
567 }
568 BodyLength::Conflicting => Err(BodyError::InvalidChunkedEncoding {
569 detail: "conflicting body length indicators",
570 }),
571 }
572}
573
574pub fn validate_content_length(
587 content_length: usize,
588 config: &BodyConfig,
589) -> Result<(), BodyError> {
590 if content_length > config.max_size {
591 return Err(BodyError::TooLarge {
592 size: content_length,
593 max: config.max_size,
594 });
595 }
596 Ok(())
597}
598
599use asupersync::io::AsyncRead;
604use asupersync::stream::Stream;
605use fastapi_core::RequestBodyStreamError;
606use std::pin::Pin;
607use std::task::{Context, Poll};
608
609pub const DEFAULT_STREAMING_THRESHOLD: usize = 64 * 1024;
613
614#[derive(Debug, Clone)]
616pub struct StreamingBodyConfig {
617 pub streaming_threshold: usize,
619 pub chunk_size: usize,
621 pub max_size: usize,
623}
624
625impl Default for StreamingBodyConfig {
626 fn default() -> Self {
627 Self {
628 streaming_threshold: DEFAULT_STREAMING_THRESHOLD,
629 chunk_size: 8 * 1024, max_size: DEFAULT_MAX_BODY_SIZE,
631 }
632 }
633}
634
635impl StreamingBodyConfig {
636 #[must_use]
638 pub fn new() -> Self {
639 Self::default()
640 }
641
642 #[must_use]
644 pub fn with_streaming_threshold(mut self, threshold: usize) -> Self {
645 self.streaming_threshold = threshold;
646 self
647 }
648
649 #[must_use]
654 pub fn with_chunk_size(mut self, size: usize) -> Self {
655 self.chunk_size = size.max(1); self
657 }
658
659 #[must_use]
661 pub fn with_max_size(mut self, size: usize) -> Self {
662 self.max_size = size;
663 self
664 }
665
666 #[must_use]
668 pub fn should_stream(&self, content_length: usize) -> bool {
669 content_length > self.streaming_threshold
670 }
671}
672
673pub struct AsyncContentLengthStream<R> {
684 reader: Option<R>,
686 initial_buffer: Vec<u8>,
688 initial_position: usize,
690 expected_size: usize,
692 bytes_read: usize,
694 chunk_size: usize,
696 max_size: usize,
698 read_buffer: Vec<u8>,
700 complete: bool,
702 error: bool,
704}
705
706impl<R> AsyncContentLengthStream<R>
707where
708 R: AsyncRead + Unpin + Send + Sync + 'static,
709{
710 pub fn new(
719 initial_buffer: Vec<u8>,
720 reader: R,
721 content_length: usize,
722 config: &StreamingBodyConfig,
723 ) -> Self {
724 Self {
725 reader: Some(reader),
726 initial_buffer,
727 initial_position: 0,
728 expected_size: content_length,
729 bytes_read: 0,
730 chunk_size: config.chunk_size,
731 max_size: config.max_size,
732 read_buffer: vec![0u8; config.chunk_size],
733 complete: false,
734 error: false,
735 }
736 }
737
738 pub fn with_defaults(initial_buffer: Vec<u8>, reader: R, content_length: usize) -> Self {
740 Self::new(
741 initial_buffer,
742 reader,
743 content_length,
744 &StreamingBodyConfig::default(),
745 )
746 }
747
748 #[must_use]
750 pub fn expected_size(&self) -> usize {
751 self.expected_size
752 }
753
754 #[must_use]
756 pub fn bytes_read(&self) -> usize {
757 self.bytes_read
758 }
759
760 #[must_use]
762 pub fn remaining(&self) -> usize {
763 self.expected_size.saturating_sub(self.bytes_read)
764 }
765
766 #[must_use]
768 pub fn is_complete(&self) -> bool {
769 self.complete
770 }
771
772 fn initial_remaining(&self) -> usize {
773 self.initial_buffer
774 .len()
775 .saturating_sub(self.initial_position)
776 }
777}
778
779impl<R> Stream for AsyncContentLengthStream<R>
780where
781 R: AsyncRead + Unpin + Send + Sync + 'static,
782{
783 type Item = Result<Vec<u8>, RequestBodyStreamError>;
784
785 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
786 if self.complete || self.error {
788 return Poll::Ready(None);
789 }
790
791 if self.bytes_read > self.max_size {
793 self.error = true;
794 let bytes_read = self.bytes_read;
795 let max_size = self.max_size;
796 return Poll::Ready(Some(Err(RequestBodyStreamError::TooLarge {
797 received: bytes_read,
798 max: max_size,
799 })));
800 }
801
802 if self.bytes_read >= self.expected_size {
804 self.complete = true;
805 return Poll::Ready(None);
806 }
807
808 let remaining_for_body = self.expected_size.saturating_sub(self.bytes_read);
809 let remaining_budget = self.max_size.saturating_sub(self.bytes_read);
810 if remaining_for_body > 0 && remaining_budget == 0 {
811 self.error = true;
812 return Poll::Ready(Some(Err(RequestBodyStreamError::TooLarge {
813 received: self.bytes_read.saturating_add(1),
814 max: self.max_size,
815 })));
816 }
817
818 let initial_remaining = self.initial_remaining();
820 if initial_remaining > 0 {
821 let chunk_size = self
822 .chunk_size
823 .min(initial_remaining)
824 .min(remaining_for_body)
825 .min(remaining_budget);
826
827 if chunk_size > 0 {
828 let start = self.initial_position;
829 let chunk = self.initial_buffer[start..start + chunk_size].to_vec();
830 self.initial_position += chunk_size;
831 self.bytes_read += chunk_size;
832 return Poll::Ready(Some(Ok(chunk)));
833 }
834 }
835
836 let remaining = self.expected_size.saturating_sub(self.bytes_read);
838 let to_read = self.chunk_size.min(remaining).min(remaining_budget);
839
840 if to_read == 0 {
841 self.complete = true;
842 return Poll::Ready(None);
843 }
844
845 if self.read_buffer.len() < to_read {
847 self.read_buffer.resize(to_read, 0);
848 }
849
850 let mut reader = match self.reader.take() {
852 Some(r) => r,
853 None => {
854 self.error = true;
855 return Poll::Ready(Some(Err(RequestBodyStreamError::ConnectionClosed)));
856 }
857 };
858
859 let read_result = {
861 let mut read_buf = asupersync::io::ReadBuf::new(&mut self.read_buffer[..to_read]);
862 match Pin::new(&mut reader).poll_read(cx, &mut read_buf) {
863 Poll::Ready(Ok(())) => {
864 let n = read_buf.filled().len();
865 let chunk = read_buf.filled().to_vec();
866 Poll::Ready(Ok((n, chunk)))
867 }
868 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
869 Poll::Pending => Poll::Pending,
870 }
871 };
872
873 match read_result {
874 Poll::Ready(Ok((n, chunk))) => {
875 if n == 0 {
876 self.error = true;
878 return Poll::Ready(Some(Err(RequestBodyStreamError::ConnectionClosed)));
879 }
880
881 self.bytes_read += n;
882
883 self.reader = Some(reader);
885
886 Poll::Ready(Some(Ok(chunk)))
887 }
888 Poll::Ready(Err(e)) => {
889 self.error = true;
890 Poll::Ready(Some(Err(RequestBodyStreamError::Io(e.to_string()))))
891 }
892 Poll::Pending => {
893 self.reader = Some(reader);
895 Poll::Pending
896 }
897 }
898 }
899}
900
901#[derive(Debug, Clone, Copy, PartialEq, Eq)]
903enum AsyncChunkedState {
904 ChunkSize,
906 ChunkData { remaining: usize },
908 ChunkDataEnd,
910 Trailers,
912 Complete,
914 Error,
916}
917
918pub struct AsyncChunkedStream<R> {
933 #[allow(dead_code)]
935 reader: Option<R>,
936 state: AsyncChunkedState,
938 bytes_decoded: usize,
940 max_size: usize,
942 chunk_size: usize,
944 #[allow(dead_code)]
946 read_buffer: Vec<u8>,
947 buffer: Vec<u8>,
949 position: usize,
951}
952
953impl<R> AsyncChunkedStream<R>
954where
955 R: AsyncRead + Unpin + Send + Sync + 'static,
956{
957 pub fn new(initial_buffer: Vec<u8>, reader: R, config: &StreamingBodyConfig) -> Self {
970 assert!(
971 initial_buffer.len() <= config.max_size,
972 "initial buffer size {} exceeds max size {}",
973 initial_buffer.len(),
974 config.max_size
975 );
976 Self {
977 reader: Some(reader),
978 state: AsyncChunkedState::ChunkSize,
979 bytes_decoded: 0,
980 max_size: config.max_size,
981 chunk_size: config.chunk_size,
982 read_buffer: vec![0u8; config.chunk_size],
983 buffer: initial_buffer,
984 position: 0,
985 }
986 }
987
988 pub fn try_new(
994 initial_buffer: Vec<u8>,
995 reader: R,
996 config: &StreamingBodyConfig,
997 ) -> Result<Self, RequestBodyStreamError> {
998 if initial_buffer.len() > config.max_size {
999 return Err(RequestBodyStreamError::Io(format!(
1000 "initial buffer size {} exceeds max size {}",
1001 initial_buffer.len(),
1002 config.max_size
1003 )));
1004 }
1005 Ok(Self {
1006 reader: Some(reader),
1007 state: AsyncChunkedState::ChunkSize,
1008 bytes_decoded: 0,
1009 max_size: config.max_size,
1010 chunk_size: config.chunk_size,
1011 read_buffer: vec![0u8; config.chunk_size],
1012 buffer: initial_buffer,
1013 position: 0,
1014 })
1015 }
1016
1017 pub fn with_defaults(initial_buffer: Vec<u8>, reader: R) -> Self {
1019 Self::new(initial_buffer, reader, &StreamingBodyConfig::default())
1020 }
1021
1022 #[must_use]
1024 pub fn bytes_decoded(&self) -> usize {
1025 self.bytes_decoded
1026 }
1027
1028 #[must_use]
1030 pub fn is_complete(&self) -> bool {
1031 self.state == AsyncChunkedState::Complete
1032 }
1033
1034 fn buffer_remaining(&self) -> &[u8] {
1036 &self.buffer[self.position..]
1037 }
1038
1039 fn consume(&mut self, n: usize) {
1041 self.position += n;
1042 }
1043
1044 fn compact_buffer_if_needed(&mut self) {
1045 if self.position == 0 {
1046 return;
1047 }
1048 if self.position >= self.buffer.len() {
1049 self.buffer.clear();
1050 self.position = 0;
1051 return;
1052 }
1053
1054 let should_compact = self.position > 8 * 1024 || self.position > (self.buffer.len() / 2);
1056 if should_compact {
1057 self.buffer.drain(..self.position);
1058 self.position = 0;
1059 }
1060 }
1061
1062 fn poll_read_more_sized(
1063 &mut self,
1064 cx: &mut Context<'_>,
1065 max_read: usize,
1066 ) -> Poll<Result<usize, RequestBodyStreamError>> {
1067 self.compact_buffer_if_needed();
1068
1069 let max_read = max_read.min(self.read_buffer.len());
1070 if max_read == 0 {
1071 self.state = AsyncChunkedState::Error;
1072 return Poll::Ready(Err(RequestBodyStreamError::Io(
1073 "invalid read buffer size".to_string(),
1074 )));
1075 }
1076
1077 let mut reader = match self.reader.take() {
1078 Some(r) => r,
1079 None => {
1080 self.state = AsyncChunkedState::Error;
1081 return Poll::Ready(Err(RequestBodyStreamError::ConnectionClosed));
1082 }
1083 };
1084
1085 let read_result = {
1086 let mut read_buf = asupersync::io::ReadBuf::new(&mut self.read_buffer[..max_read]);
1087 match Pin::new(&mut reader).poll_read(cx, &mut read_buf) {
1088 Poll::Ready(Ok(())) => {
1089 let filled = read_buf.filled();
1090 Poll::Ready(Ok(filled.len()))
1091 }
1092 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
1093 Poll::Pending => Poll::Pending,
1094 }
1095 };
1096
1097 match read_result {
1098 Poll::Ready(Ok(n)) => {
1099 if n == 0 {
1100 self.state = AsyncChunkedState::Error;
1101 self.reader = Some(reader);
1102 return Poll::Ready(Err(RequestBodyStreamError::ConnectionClosed));
1103 }
1104 self.buffer.extend_from_slice(&self.read_buffer[..n]);
1105 self.reader = Some(reader);
1106 Poll::Ready(Ok(n))
1107 }
1108 Poll::Ready(Err(e)) => {
1109 self.state = AsyncChunkedState::Error;
1110 self.reader = Some(reader);
1111 Poll::Ready(Err(RequestBodyStreamError::Io(e.to_string())))
1112 }
1113 Poll::Pending => {
1114 self.reader = Some(reader);
1115 Poll::Pending
1116 }
1117 }
1118 }
1119}
1120
1121impl<R> Stream for AsyncChunkedStream<R>
1122where
1123 R: AsyncRead + Unpin + Send + Sync + 'static,
1124{
1125 type Item = Result<Vec<u8>, RequestBodyStreamError>;
1126
1127 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1128 if self.state == AsyncChunkedState::Complete || self.state == AsyncChunkedState::Error {
1130 return Poll::Ready(None);
1131 }
1132
1133 loop {
1134 match self.state {
1135 AsyncChunkedState::ChunkSize => {
1136 let remaining = self.buffer_remaining();
1138 if let Some(crlf_pos) = remaining.windows(2).position(|w| w == b"\r\n") {
1139 let size_line = &remaining[..crlf_pos];
1141
1142 let size_str = if let Some(semi) = size_line.iter().position(|&b| b == b';')
1144 {
1145 &size_line[..semi]
1146 } else {
1147 size_line
1148 };
1149
1150 let size_str = match std::str::from_utf8(size_str) {
1151 Ok(s) => s.trim(),
1152 Err(_) => {
1153 self.state = AsyncChunkedState::Error;
1154 return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
1155 "invalid UTF-8 in chunk size".to_string(),
1156 ))));
1157 }
1158 };
1159
1160 let chunk_size = match usize::from_str_radix(size_str, 16) {
1161 Ok(s) => s,
1162 Err(_) => {
1163 self.state = AsyncChunkedState::Error;
1164 return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
1165 "invalid hex chunk size".to_string(),
1166 ))));
1167 }
1168 };
1169
1170 if chunk_size > 0
1172 && self.bytes_decoded.saturating_add(chunk_size) > self.max_size
1173 {
1174 self.state = AsyncChunkedState::Error;
1175 let bytes_decoded = self.bytes_decoded;
1176 let max_size = self.max_size;
1177 return Poll::Ready(Some(Err(RequestBodyStreamError::TooLarge {
1178 received: bytes_decoded,
1179 max: max_size,
1180 })));
1181 }
1182
1183 const MAX_SINGLE_CHUNK: usize = 16 * 1024 * 1024;
1185 if chunk_size > MAX_SINGLE_CHUNK {
1186 self.state = AsyncChunkedState::Error;
1187 return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
1188 "chunk size exceeds 16MB limit".to_string(),
1189 ))));
1190 }
1191
1192 self.consume(crlf_pos + 2);
1193
1194 if chunk_size == 0 {
1195 self.state = AsyncChunkedState::Trailers;
1197 continue;
1198 }
1199
1200 self.state = AsyncChunkedState::ChunkData {
1201 remaining: chunk_size,
1202 };
1203 continue;
1204 }
1205
1206 const MAX_CHUNK_SIZE_LINE: usize = 1024;
1210 if remaining.len() > MAX_CHUNK_SIZE_LINE {
1211 self.state = AsyncChunkedState::Error;
1212 return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
1213 "chunk size line too long".to_string(),
1214 ))));
1215 }
1216
1217 match self.poll_read_more_sized(cx, 1) {
1220 Poll::Ready(Ok(_n)) => {}
1221 Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
1222 Poll::Pending => return Poll::Pending,
1223 }
1224 }
1225 AsyncChunkedState::ChunkData { remaining } => {
1226 if remaining > 0 && self.bytes_decoded >= self.max_size {
1228 self.state = AsyncChunkedState::Error;
1229 let bytes_decoded = self.bytes_decoded;
1230 let max_size = self.max_size;
1231 return Poll::Ready(Some(Err(RequestBodyStreamError::TooLarge {
1232 received: bytes_decoded,
1233 max: max_size,
1234 })));
1235 }
1236
1237 let buffer_remaining = self.buffer_remaining();
1239 let to_read = remaining.min(buffer_remaining.len()).min(self.chunk_size);
1240
1241 if to_read > 0 {
1242 let chunk = buffer_remaining[..to_read].to_vec();
1243 self.consume(to_read);
1244 self.bytes_decoded += to_read;
1245
1246 let new_remaining = remaining - to_read;
1247 if new_remaining == 0 {
1248 self.state = AsyncChunkedState::ChunkDataEnd;
1249 } else {
1250 self.state = AsyncChunkedState::ChunkData {
1251 remaining: new_remaining,
1252 };
1253 }
1254
1255 return Poll::Ready(Some(Ok(chunk)));
1256 }
1257
1258 let want = remaining.min(self.chunk_size).max(1);
1261 match self.poll_read_more_sized(cx, want) {
1262 Poll::Ready(Ok(_n)) => {}
1263 Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
1264 Poll::Pending => return Poll::Pending,
1265 }
1266 }
1267 AsyncChunkedState::ChunkDataEnd => {
1268 let remaining = self.buffer_remaining();
1270 if remaining.len() >= 2 {
1271 if &remaining[..2] == b"\r\n" {
1272 self.consume(2);
1273 self.state = AsyncChunkedState::ChunkSize;
1274 continue;
1275 }
1276 self.state = AsyncChunkedState::Error;
1277 return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
1278 "expected CRLF after chunk data".to_string(),
1279 ))));
1280 }
1281
1282 match self.poll_read_more_sized(cx, 1) {
1284 Poll::Ready(Ok(_n)) => {}
1285 Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
1286 Poll::Pending => return Poll::Pending,
1287 }
1288 }
1289 AsyncChunkedState::Trailers => {
1290 let remaining = self.buffer_remaining();
1295
1296 if remaining.len() >= 2 && &remaining[..2] == b"\r\n" {
1297 self.consume(2);
1298 self.state = AsyncChunkedState::Complete;
1299 return Poll::Ready(None);
1300 }
1301
1302 const MAX_TRAILER_LINE: usize = 8 * 1024;
1304 if remaining.len() > MAX_TRAILER_LINE {
1305 self.state = AsyncChunkedState::Error;
1306 return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
1307 "trailer line too long".to_string(),
1308 ))));
1309 }
1310
1311 if let Some(crlf_pos) = remaining.windows(2).position(|w| w == b"\r\n") {
1312 self.consume(crlf_pos + 2);
1314 continue;
1315 }
1316
1317 match self.poll_read_more_sized(cx, 1) {
1318 Poll::Ready(Ok(_n)) => {}
1319 Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
1320 Poll::Pending => return Poll::Pending,
1321 }
1322 }
1323 AsyncChunkedState::Complete | AsyncChunkedState::Error => {
1324 return Poll::Ready(None);
1325 }
1326 }
1327 }
1328 }
1329}
1330
1331pub fn create_content_length_stream<R>(
1343 initial_buffer: Vec<u8>,
1344 reader: R,
1345 content_length: usize,
1346 config: &StreamingBodyConfig,
1347) -> fastapi_core::Body
1348where
1349 R: AsyncRead + Unpin + Send + Sync + 'static,
1350{
1351 let stream = AsyncContentLengthStream::new(initial_buffer, reader, content_length, config);
1352 fastapi_core::Body::streaming_with_size(stream, content_length)
1353}
1354
1355pub fn create_chunked_stream<R>(
1365 initial_buffer: Vec<u8>,
1366 reader: R,
1367 config: &StreamingBodyConfig,
1368) -> fastapi_core::Body
1369where
1370 R: AsyncRead + Unpin + Send + Sync + 'static,
1371{
1372 let stream = AsyncChunkedStream::new(initial_buffer, reader, config);
1373 fastapi_core::Body::streaming(stream)
1374}
1375
1376#[cfg(test)]
1381mod tests {
1382 use super::*;
1383
1384 #[test]
1389 fn body_config_defaults() {
1390 let config = BodyConfig::default();
1391 assert_eq!(config.max_size(), DEFAULT_MAX_BODY_SIZE);
1392 assert_eq!(config.initial_capacity(), 4096);
1393 }
1394
1395 #[test]
1396 fn body_config_custom() {
1397 let config = BodyConfig::new()
1398 .with_max_size(2048)
1399 .with_initial_capacity(1024);
1400 assert_eq!(config.max_size(), 2048);
1401 assert_eq!(config.initial_capacity(), 1024);
1402 }
1403
1404 #[test]
1409 fn content_length_basic() {
1410 let body = b"Hello, World!";
1411 let config = BodyConfig::default();
1412 let mut reader = ContentLengthReader::new(body, body.len(), &config).unwrap();
1413
1414 assert_eq!(reader.length(), 13);
1415 assert_eq!(reader.remaining(), 13);
1416 assert!(!reader.is_complete());
1417
1418 let result = reader.read_all().unwrap();
1419 assert_eq!(result, b"Hello, World!");
1420 assert!(reader.is_complete());
1421 }
1422
1423 #[test]
1424 fn content_length_zero() {
1425 let body = b"";
1426 let config = BodyConfig::default();
1427 let mut reader = ContentLengthReader::new(body, 0, &config).unwrap();
1428
1429 assert_eq!(reader.length(), 0);
1430 assert!(reader.is_complete());
1431
1432 let result = reader.read_all().unwrap();
1433 assert!(result.is_empty());
1434 }
1435
1436 #[test]
1437 fn content_length_too_large() {
1438 let body = b"small";
1439 let config = BodyConfig::new().with_max_size(3);
1440 let result = ContentLengthReader::new(body, 100, &config);
1441
1442 assert!(matches!(
1443 result,
1444 Err(BodyError::TooLarge { size: 100, max: 3 })
1445 ));
1446 }
1447
1448 #[test]
1449 fn content_length_incomplete() {
1450 let body = b"Hello";
1451 let config = BodyConfig::default();
1452 let mut reader = ContentLengthReader::new(body, 10, &config).unwrap();
1453
1454 let result = reader.read_all();
1455 assert!(matches!(
1456 result,
1457 Err(BodyError::Incomplete {
1458 received: 5,
1459 expected: Some(10)
1460 })
1461 ));
1462 }
1463
1464 #[test]
1465 fn content_length_borrowed() {
1466 let body = b"Hello, World!";
1467 let config = BodyConfig::default();
1468 let reader = ContentLengthReader::new(body, body.len(), &config).unwrap();
1469
1470 let borrowed = reader.read_all_borrowed().unwrap();
1471 assert_eq!(borrowed, body);
1472 assert_eq!(borrowed.as_ptr(), body.as_ptr());
1474 }
1475
1476 #[test]
1477 fn content_length_incremental_read() {
1478 let body = b"Hello, World!";
1479 let config = BodyConfig::default();
1480 let mut reader = ContentLengthReader::new(body, body.len(), &config).unwrap();
1481
1482 let mut buf = [0u8; 5];
1483
1484 let n = reader.read(&mut buf).unwrap();
1486 assert_eq!(n, 5);
1487 assert_eq!(&buf[..n], b"Hello");
1488 assert_eq!(reader.remaining(), 8);
1489
1490 let n = reader.read(&mut buf).unwrap();
1492 assert_eq!(n, 5);
1493 assert_eq!(&buf[..n], b", Wor");
1494 assert_eq!(reader.remaining(), 3);
1495
1496 let n = reader.read(&mut buf).unwrap();
1498 assert_eq!(n, 3);
1499 assert_eq!(&buf[..n], b"ld!");
1500 assert!(reader.is_complete());
1501
1502 let n = reader.read(&mut buf).unwrap();
1504 assert_eq!(n, 0);
1505 }
1506
1507 #[test]
1512 fn chunked_single_chunk() {
1513 let body = b"5\r\nHello\r\n0\r\n\r\n";
1514 let config = BodyConfig::default();
1515 let mut reader = ChunkedReader::new(body, &config);
1516
1517 let result = reader.decode_all().unwrap();
1518 assert_eq!(result, b"Hello");
1519 assert!(reader.is_complete());
1520 }
1521
1522 #[test]
1523 fn chunked_multiple_chunks() {
1524 let body = b"5\r\nHello\r\n7\r\n, World\r\n1\r\n!\r\n0\r\n\r\n";
1525 let config = BodyConfig::default();
1526 let mut reader = ChunkedReader::new(body, &config);
1527
1528 let result = reader.decode_all().unwrap();
1529 assert_eq!(result, b"Hello, World!");
1530 assert!(reader.is_complete());
1531 }
1532
1533 #[test]
1534 fn chunked_empty() {
1535 let body = b"0\r\n\r\n";
1536 let config = BodyConfig::default();
1537 let mut reader = ChunkedReader::new(body, &config);
1538
1539 let result = reader.decode_all().unwrap();
1540 assert!(result.is_empty());
1541 assert!(reader.is_complete());
1542 }
1543
1544 #[test]
1545 fn chunked_with_extension() {
1546 let body = b"5;ext=value\r\nHello\r\n0\r\n\r\n";
1548 let config = BodyConfig::default();
1549 let mut reader = ChunkedReader::new(body, &config);
1550
1551 let result = reader.decode_all().unwrap();
1552 assert_eq!(result, b"Hello");
1553 }
1554
1555 #[test]
1556 fn chunked_with_trailers() {
1557 let body = b"5\r\nHello\r\n0\r\nTrailer: value\r\n\r\n";
1558 let config = BodyConfig::default();
1559 let mut reader = ChunkedReader::new(body, &config);
1560
1561 let result = reader.decode_all().unwrap();
1562 assert_eq!(result, b"Hello");
1563 assert!(reader.is_complete());
1564 }
1565
1566 #[test]
1567 fn chunked_hex_sizes() {
1568 let body = b"a\r\n0123456789\r\nF\r\n0123456789ABCDE\r\n0\r\n\r\n";
1570 let config = BodyConfig::default();
1571 let mut reader = ChunkedReader::new(body, &config);
1572
1573 let result = reader.decode_all().unwrap();
1574 assert_eq!(result.len(), 10 + 15); }
1576
1577 #[test]
1578 fn chunked_too_large() {
1579 let body = b"10\r\n0123456789ABCDEF\r\n0\r\n\r\n"; let config = BodyConfig::new().with_max_size(10);
1581 let mut reader = ChunkedReader::new(body, &config);
1582
1583 let result = reader.decode_all();
1584 assert!(matches!(
1585 result,
1586 Err(BodyError::TooLarge { size: 16, max: 10 })
1587 ));
1588 }
1589
1590 #[test]
1591 fn chunked_invalid_size() {
1592 let body = b"xyz\r\nHello\r\n0\r\n\r\n";
1593 let config = BodyConfig::default();
1594 let mut reader = ChunkedReader::new(body, &config);
1595
1596 let result = reader.decode_all();
1597 assert!(matches!(
1598 result,
1599 Err(BodyError::InvalidChunkedEncoding { detail: _ })
1600 ));
1601 }
1602
1603 #[test]
1604 fn chunked_missing_crlf() {
1605 let body = b"5\r\nHelloX0\r\n\r\n"; let config = BodyConfig::default();
1607 let mut reader = ChunkedReader::new(body, &config);
1608
1609 let result = reader.decode_all();
1610 assert!(matches!(
1611 result,
1612 Err(BodyError::InvalidChunkedEncoding {
1613 detail: "expected CRLF after chunk data"
1614 })
1615 ));
1616 }
1617
1618 #[test]
1619 fn chunked_incomplete() {
1620 let body = b"5\r\nHel"; let config = BodyConfig::default();
1622 let mut reader = ChunkedReader::new(body, &config);
1623
1624 let result = reader.decode_all();
1625 assert!(matches!(result, Err(BodyError::Incomplete { .. })));
1626 }
1627
1628 #[test]
1633 fn parse_body_none() {
1634 let config = BodyConfig::default();
1635 let result = parse_body(b"ignored", BodyLength::None, &config).unwrap();
1636 assert!(result.is_none());
1637 }
1638
1639 #[test]
1640 fn parse_body_content_length() {
1641 let config = BodyConfig::default();
1642 let result = parse_body(b"Hello, World!", BodyLength::ContentLength(13), &config).unwrap();
1643 assert_eq!(result.unwrap(), b"Hello, World!");
1644 }
1645
1646 #[test]
1647 fn parse_body_content_length_zero() {
1648 let config = BodyConfig::default();
1649 let result = parse_body(b"", BodyLength::ContentLength(0), &config).unwrap();
1650 assert_eq!(result.unwrap(), b"");
1651 }
1652
1653 #[test]
1654 fn parse_body_chunked() {
1655 let config = BodyConfig::default();
1656 let result = parse_body(b"5\r\nHello\r\n0\r\n\r\n", BodyLength::Chunked, &config).unwrap();
1657 assert_eq!(result.unwrap(), b"Hello");
1658 }
1659
1660 #[test]
1661 fn parse_body_with_consumed_content_length() {
1662 let config = BodyConfig::default();
1663 let (body, consumed) =
1664 parse_body_with_consumed(b"Hello, World!", BodyLength::ContentLength(13), &config)
1665 .unwrap();
1666 assert_eq!(body.unwrap(), b"Hello, World!");
1667 assert_eq!(consumed, 13);
1668 }
1669
1670 #[test]
1671 fn parse_body_with_consumed_chunked() {
1672 let config = BodyConfig::default();
1673 let (body, consumed) =
1674 parse_body_with_consumed(b"5\r\nHello\r\n0\r\n\r\n", BodyLength::Chunked, &config)
1675 .unwrap();
1676 assert_eq!(body.unwrap(), b"Hello");
1677 assert_eq!(consumed, 15);
1678 }
1679
1680 #[test]
1685 fn validate_content_length_ok() {
1686 let config = BodyConfig::new().with_max_size(1000);
1687 assert!(validate_content_length(500, &config).is_ok());
1688 assert!(validate_content_length(1000, &config).is_ok());
1689 }
1690
1691 #[test]
1692 fn validate_content_length_too_large() {
1693 let config = BodyConfig::new().with_max_size(1000);
1694 let result = validate_content_length(1001, &config);
1695 assert!(matches!(
1696 result,
1697 Err(BodyError::TooLarge {
1698 size: 1001,
1699 max: 1000
1700 })
1701 ));
1702 }
1703
1704 #[test]
1709 fn body_error_display() {
1710 let err = BodyError::TooLarge {
1711 size: 2000,
1712 max: 1000,
1713 };
1714 assert_eq!(
1715 format!("{err}"),
1716 "body too large: 2000 bytes exceeds limit of 1000"
1717 );
1718
1719 let err = BodyError::InvalidChunkedEncoding {
1720 detail: "bad format",
1721 };
1722 assert_eq!(format!("{err}"), "invalid chunked encoding: bad format");
1723
1724 let err = BodyError::Incomplete {
1725 received: 50,
1726 expected: Some(100),
1727 };
1728 assert_eq!(
1729 format!("{err}"),
1730 "incomplete body: received 50 of 100 bytes"
1731 );
1732
1733 let err = BodyError::UnexpectedEof;
1734 assert_eq!(format!("{err}"), "unexpected end of body");
1735 }
1736
1737 #[test]
1742 fn streaming_body_config_defaults() {
1743 let config = StreamingBodyConfig::default();
1744 assert_eq!(config.streaming_threshold, DEFAULT_STREAMING_THRESHOLD);
1745 assert_eq!(config.chunk_size, 8 * 1024);
1746 assert_eq!(config.max_size, DEFAULT_MAX_BODY_SIZE);
1747 }
1748
1749 #[test]
1750 fn streaming_body_config_custom() {
1751 let config = StreamingBodyConfig::new()
1752 .with_streaming_threshold(1024)
1753 .with_chunk_size(4096)
1754 .with_max_size(10_000);
1755 assert_eq!(config.streaming_threshold, 1024);
1756 assert_eq!(config.chunk_size, 4096);
1757 assert_eq!(config.max_size, 10_000);
1758 }
1759
1760 #[test]
1761 fn streaming_body_config_minimum_chunk_size() {
1762 let config = StreamingBodyConfig::new().with_chunk_size(0);
1763 assert_eq!(config.chunk_size, 1);
1765 }
1766
1767 #[test]
1768 fn streaming_body_config_should_stream() {
1769 let config = StreamingBodyConfig::new().with_streaming_threshold(1000);
1770 assert!(!config.should_stream(500));
1771 assert!(!config.should_stream(1000));
1772 assert!(config.should_stream(1001));
1773 assert!(config.should_stream(10000));
1774 }
1775
1776 #[test]
1781 fn async_content_length_stream_from_buffer() {
1782 use std::sync::Arc;
1783 use std::task::{Wake, Waker};
1784
1785 struct NoopWaker;
1786 impl Wake for NoopWaker {
1787 fn wake(self: Arc<Self>) {}
1788 }
1789
1790 fn noop_waker() -> Waker {
1791 Waker::from(Arc::new(NoopWaker))
1792 }
1793
1794 struct EmptyReader;
1796 impl AsyncRead for EmptyReader {
1797 fn poll_read(
1798 self: Pin<&mut Self>,
1799 _cx: &mut Context<'_>,
1800 _buf: &mut asupersync::io::ReadBuf<'_>,
1801 ) -> Poll<std::io::Result<()>> {
1802 Poll::Ready(Ok(()))
1803 }
1804 }
1805
1806 let buffer = b"Hello, World!".to_vec();
1807 let config = StreamingBodyConfig::new().with_chunk_size(5);
1808 let mut stream = AsyncContentLengthStream::new(buffer, EmptyReader, 13, &config);
1809
1810 assert_eq!(stream.expected_size(), 13);
1811 assert_eq!(stream.bytes_read(), 0);
1812 assert_eq!(stream.remaining(), 13);
1813
1814 let waker = noop_waker();
1815 let mut cx = Context::from_waker(&waker);
1816
1817 let result = Pin::new(&mut stream).poll_next(&mut cx);
1819 match result {
1820 Poll::Ready(Some(Ok(chunk))) => {
1821 assert_eq!(chunk, b"Hello");
1822 }
1823 _ => panic!("expected chunk"),
1824 }
1825 assert_eq!(stream.bytes_read(), 5);
1826
1827 let result = Pin::new(&mut stream).poll_next(&mut cx);
1829 match result {
1830 Poll::Ready(Some(Ok(chunk))) => {
1831 assert_eq!(chunk, b", Wor");
1832 }
1833 _ => panic!("expected chunk"),
1834 }
1835
1836 let result = Pin::new(&mut stream).poll_next(&mut cx);
1838 match result {
1839 Poll::Ready(Some(Ok(chunk))) => {
1840 assert_eq!(chunk, b"ld!");
1841 }
1842 _ => panic!("expected chunk"),
1843 }
1844
1845 let result = Pin::new(&mut stream).poll_next(&mut cx);
1847 assert!(matches!(result, Poll::Ready(None)));
1848 assert!(stream.is_complete());
1849 }
1850
1851 #[test]
1852 fn async_content_length_stream_enforces_max_size() {
1853 use std::io::Cursor;
1854 use std::sync::Arc;
1855 use std::task::{Wake, Waker};
1856
1857 struct NoopWaker;
1858 impl Wake for NoopWaker {
1859 fn wake(self: Arc<Self>) {}
1860 }
1861
1862 fn noop_waker() -> Waker {
1863 Waker::from(Arc::new(NoopWaker))
1864 }
1865
1866 let initial = b"123456".to_vec();
1867 let reader = Cursor::new(b"abcdef".to_vec());
1868 let config = StreamingBodyConfig::new()
1869 .with_chunk_size(8)
1870 .with_max_size(10);
1871 let mut stream = AsyncContentLengthStream::new(initial, reader, 12, &config);
1872
1873 let waker = noop_waker();
1874 let mut cx = Context::from_waker(&waker);
1875
1876 let result = Pin::new(&mut stream).poll_next(&mut cx);
1878 match result {
1879 Poll::Ready(Some(Ok(chunk))) => assert_eq!(chunk, b"123456"),
1880 _ => panic!("expected initial chunk"),
1881 }
1882
1883 let result = Pin::new(&mut stream).poll_next(&mut cx);
1885 match result {
1886 Poll::Ready(Some(Ok(chunk))) => assert_eq!(chunk, b"abcd"),
1887 _ => panic!("expected bounded reader chunk"),
1888 }
1889
1890 let result = Pin::new(&mut stream).poll_next(&mut cx);
1892 match result {
1893 Poll::Ready(Some(Err(RequestBodyStreamError::TooLarge { received, max }))) => {
1894 assert_eq!(received, 11);
1895 assert_eq!(max, 10);
1896 }
1897 _ => panic!("expected TooLarge error, got {:?}", result),
1898 }
1899 }
1900
1901 #[test]
1906 fn async_chunked_stream_simple() {
1907 use std::sync::Arc;
1908 use std::task::{Wake, Waker};
1909
1910 struct NoopWaker;
1911 impl Wake for NoopWaker {
1912 fn wake(self: Arc<Self>) {}
1913 }
1914
1915 fn noop_waker() -> Waker {
1916 Waker::from(Arc::new(NoopWaker))
1917 }
1918
1919 struct EmptyReader;
1920 impl AsyncRead for EmptyReader {
1921 fn poll_read(
1922 self: Pin<&mut Self>,
1923 _cx: &mut Context<'_>,
1924 _buf: &mut asupersync::io::ReadBuf<'_>,
1925 ) -> Poll<std::io::Result<()>> {
1926 Poll::Ready(Ok(()))
1927 }
1928 }
1929
1930 let buffer = b"5\r\nHello\r\n0\r\n\r\n".to_vec();
1932 let config = StreamingBodyConfig::new().with_chunk_size(1024);
1933 let mut stream = AsyncChunkedStream::new(buffer, EmptyReader, &config);
1934
1935 let waker = noop_waker();
1936 let mut cx = Context::from_waker(&waker);
1937
1938 let result = Pin::new(&mut stream).poll_next(&mut cx);
1940 match result {
1941 Poll::Ready(Some(Ok(chunk))) => {
1942 assert_eq!(chunk, b"Hello");
1943 }
1944 _ => panic!("expected chunk, got {:?}", result),
1945 }
1946
1947 let result = Pin::new(&mut stream).poll_next(&mut cx);
1950 assert!(matches!(result, Poll::Ready(None)));
1952 assert!(stream.is_complete());
1953 }
1954
1955 #[test]
1956 fn async_chunked_stream_multiple_chunks() {
1957 use std::sync::Arc;
1958 use std::task::{Wake, Waker};
1959
1960 struct NoopWaker;
1961 impl Wake for NoopWaker {
1962 fn wake(self: Arc<Self>) {}
1963 }
1964
1965 fn noop_waker() -> Waker {
1966 Waker::from(Arc::new(NoopWaker))
1967 }
1968
1969 struct EmptyReader;
1970 impl AsyncRead for EmptyReader {
1971 fn poll_read(
1972 self: Pin<&mut Self>,
1973 _cx: &mut Context<'_>,
1974 _buf: &mut asupersync::io::ReadBuf<'_>,
1975 ) -> Poll<std::io::Result<()>> {
1976 Poll::Ready(Ok(()))
1977 }
1978 }
1979
1980 let buffer = b"5\r\nHello\r\n8\r\n, World!\r\n0\r\n\r\n".to_vec();
1982 let config = StreamingBodyConfig::new();
1983 let mut stream = AsyncChunkedStream::new(buffer, EmptyReader, &config);
1984
1985 let waker = noop_waker();
1986 let mut cx = Context::from_waker(&waker);
1987
1988 let mut collected = Vec::new();
1990 loop {
1991 match Pin::new(&mut stream).poll_next(&mut cx) {
1992 Poll::Ready(Some(Ok(chunk))) => collected.extend_from_slice(&chunk),
1993 Poll::Ready(Some(Err(e))) => panic!("unexpected error: {e}"),
1994 Poll::Ready(None) => break,
1995 Poll::Pending => {} }
1997 }
1998
1999 assert_eq!(collected, b"Hello, World!");
2000 }
2001}