1use crate::parser::{BodyLength, ParseError};
31
32pub const DEFAULT_MAX_BODY_SIZE: usize = 1024 * 1024;
34
35#[derive(Debug, Clone)]
37pub struct BodyConfig {
38 max_size: usize,
40 initial_capacity: usize,
42}
43
44impl Default for BodyConfig {
45 fn default() -> Self {
46 Self {
47 max_size: DEFAULT_MAX_BODY_SIZE,
48 initial_capacity: 4096,
49 }
50 }
51}
52
53impl BodyConfig {
54 #[must_use]
56 pub fn new() -> Self {
57 Self::default()
58 }
59
60 #[must_use]
62 pub fn with_max_size(mut self, size: usize) -> Self {
63 self.max_size = size;
64 self
65 }
66
67 #[must_use]
69 pub fn with_initial_capacity(mut self, capacity: usize) -> Self {
70 self.initial_capacity = capacity;
71 self
72 }
73
74 #[must_use]
76 pub fn max_size(&self) -> usize {
77 self.max_size
78 }
79
80 #[must_use]
82 pub fn initial_capacity(&self) -> usize {
83 self.initial_capacity
84 }
85}
86
87#[derive(Debug)]
89pub enum BodyError {
90 TooLarge {
92 size: usize,
94 max: usize,
96 },
97 InvalidChunkedEncoding {
99 detail: &'static str,
101 },
102 Incomplete {
104 received: usize,
106 expected: Option<usize>,
108 },
109 UnexpectedEof,
111 Parse(ParseError),
113}
114
115impl std::fmt::Display for BodyError {
116 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117 match self {
118 Self::TooLarge { size, max } => {
119 write!(f, "body too large: {size} bytes exceeds limit of {max}")
120 }
121 Self::InvalidChunkedEncoding { detail } => {
122 write!(f, "invalid chunked encoding: {detail}")
123 }
124 Self::Incomplete { received, expected } => {
125 if let Some(exp) = expected {
126 write!(f, "incomplete body: received {received} of {exp} bytes")
127 } else {
128 write!(f, "incomplete body: received {received} bytes")
129 }
130 }
131 Self::UnexpectedEof => write!(f, "unexpected end of body"),
132 Self::Parse(e) => write!(f, "parse error: {e}"),
133 }
134 }
135}
136
137impl std::error::Error for BodyError {
138 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
139 match self {
140 Self::Parse(e) => Some(e),
141 _ => None,
142 }
143 }
144}
145
146impl From<ParseError> for BodyError {
147 fn from(e: ParseError) -> Self {
148 Self::Parse(e)
149 }
150}
151
152#[derive(Debug)]
161pub struct ContentLengthReader<'a> {
162 buffer: &'a [u8],
163 length: usize,
164 position: usize,
165 #[allow(dead_code)]
167 config: BodyConfig,
168}
169
170impl<'a> ContentLengthReader<'a> {
171 pub fn new(buffer: &'a [u8], length: usize, config: &BodyConfig) -> Result<Self, BodyError> {
183 if length > config.max_size {
185 return Err(BodyError::TooLarge {
186 size: length,
187 max: config.max_size,
188 });
189 }
190
191 Ok(Self {
192 buffer,
193 length,
194 position: 0,
195 config: config.clone(),
196 })
197 }
198
199 #[must_use]
201 pub fn length(&self) -> usize {
202 self.length
203 }
204
205 #[must_use]
207 pub fn remaining(&self) -> usize {
208 self.length.saturating_sub(self.position)
209 }
210
211 #[must_use]
213 pub fn is_complete(&self) -> bool {
214 self.position >= self.length
215 }
216
217 pub fn read(&mut self, dest: &mut [u8]) -> Result<usize, BodyError> {
221 if self.is_complete() {
222 return Ok(0);
223 }
224
225 let available = self.buffer.len().saturating_sub(self.position);
226 let to_read = dest.len().min(self.remaining()).min(available);
227
228 if to_read == 0 && !self.is_complete() {
229 return Err(BodyError::Incomplete {
230 received: self.position,
231 expected: Some(self.length),
232 });
233 }
234
235 dest[..to_read].copy_from_slice(&self.buffer[self.position..self.position + to_read]);
236 self.position += to_read;
237
238 Ok(to_read)
239 }
240
241 pub fn read_all(&mut self) -> Result<Vec<u8>, BodyError> {
247 if self.buffer.len() < self.length {
248 return Err(BodyError::Incomplete {
249 received: self.buffer.len(),
250 expected: Some(self.length),
251 });
252 }
253
254 let body = self.buffer[..self.length].to_vec();
255 self.position = self.length;
256 Ok(body)
257 }
258
259 pub fn read_all_borrowed(&self) -> Result<&'a [u8], BodyError> {
267 if self.buffer.len() < self.length {
268 return Err(BodyError::Incomplete {
269 received: self.buffer.len(),
270 expected: Some(self.length),
271 });
272 }
273
274 Ok(&self.buffer[..self.length])
275 }
276}
277
278#[derive(Debug, Clone, Copy, PartialEq, Eq)]
284enum ChunkedState {
285 ChunkSize,
287 ChunkData { remaining: usize },
289 ChunkDataEnd,
291 Trailers,
293 Complete,
295}
296
297#[derive(Debug)]
308pub struct ChunkedReader<'a> {
309 buffer: &'a [u8],
310 position: usize,
311 state: ChunkedState,
312 total_size: usize,
313 config: BodyConfig,
314}
315
316impl<'a> ChunkedReader<'a> {
317 #[must_use]
324 pub fn new(buffer: &'a [u8], config: &BodyConfig) -> Self {
325 Self {
326 buffer,
327 position: 0,
328 state: ChunkedState::ChunkSize,
329 total_size: 0,
330 config: config.clone(),
331 }
332 }
333
334 #[must_use]
336 pub fn is_complete(&self) -> bool {
337 self.state == ChunkedState::Complete
338 }
339
340 #[must_use]
342 pub fn total_size(&self) -> usize {
343 self.total_size
344 }
345
346 fn parse_chunk_size(&self) -> Result<(usize, usize), BodyError> {
348 let remaining = &self.buffer[self.position..];
349
350 let line_end =
352 remaining
353 .windows(2)
354 .position(|w| w == b"\r\n")
355 .ok_or(BodyError::Incomplete {
356 received: self.position,
357 expected: None,
358 })?;
359
360 let size_line = &remaining[..line_end];
361
362 let size_str = if let Some(semi) = size_line.iter().position(|&b| b == b';') {
364 &size_line[..semi]
365 } else {
366 size_line
367 };
368
369 let size_str =
370 std::str::from_utf8(size_str).map_err(|_| BodyError::InvalidChunkedEncoding {
371 detail: "invalid UTF-8 in chunk size",
372 })?;
373
374 let size = usize::from_str_radix(size_str.trim(), 16).map_err(|_| {
375 BodyError::InvalidChunkedEncoding {
376 detail: "invalid hex chunk size",
377 }
378 })?;
379
380 const MAX_SINGLE_CHUNK: usize = 16 * 1024 * 1024;
383 if size > MAX_SINGLE_CHUNK {
384 return Err(BodyError::InvalidChunkedEncoding {
385 detail: "chunk size exceeds 16MB limit",
386 });
387 }
388
389 Ok((size, line_end + 2))
391 }
392
393 pub fn decode_all(&mut self) -> Result<Vec<u8>, BodyError> {
402 let mut output = Vec::with_capacity(self.config.initial_capacity);
403
404 loop {
405 match self.state {
406 ChunkedState::ChunkSize => {
407 let (size, consumed) = self.parse_chunk_size()?;
408 self.position += consumed;
409
410 let new_total = self.total_size.saturating_add(size);
412 if new_total > self.config.max_size {
413 return Err(BodyError::TooLarge {
414 size: new_total,
415 max: self.config.max_size,
416 });
417 }
418
419 if size == 0 {
420 self.state = ChunkedState::Trailers;
422 } else {
423 self.state = ChunkedState::ChunkData { remaining: size };
424 }
425 }
426 ChunkedState::ChunkData { remaining } => {
427 let available = self.buffer.len().saturating_sub(self.position);
428 if available < remaining {
429 return Err(BodyError::Incomplete {
430 received: self.total_size + (remaining - available),
431 expected: None,
432 });
433 }
434
435 let chunk_data = &self.buffer[self.position..self.position + remaining];
437 output.extend_from_slice(chunk_data);
438 self.position += remaining;
439 self.total_size += remaining;
440
441 self.state = ChunkedState::ChunkDataEnd;
442 }
443 ChunkedState::ChunkDataEnd => {
444 let remaining = &self.buffer[self.position..];
446 if remaining.len() < 2 {
447 return Err(BodyError::Incomplete {
448 received: self.total_size,
449 expected: None,
450 });
451 }
452
453 if &remaining[..2] != b"\r\n" {
454 return Err(BodyError::InvalidChunkedEncoding {
455 detail: "expected CRLF after chunk data",
456 });
457 }
458
459 self.position += 2;
460 self.state = ChunkedState::ChunkSize;
461 }
462 ChunkedState::Trailers => {
463 let remaining = &self.buffer[self.position..];
465
466 if remaining.starts_with(b"\r\n") {
468 self.position += 2;
469 self.state = ChunkedState::Complete;
470 } else {
471 let line_end = remaining.windows(2).position(|w| w == b"\r\n");
473 match line_end {
474 Some(pos) => {
475 self.position += pos + 2;
477 }
479 None => {
480 return Err(BodyError::Incomplete {
481 received: self.total_size,
482 expected: None,
483 });
484 }
485 }
486 }
487 }
488 ChunkedState::Complete => {
489 break;
490 }
491 }
492 }
493
494 Ok(output)
495 }
496
497 #[must_use]
499 pub fn bytes_consumed(&self) -> usize {
500 self.position
501 }
502}
503
504pub fn parse_body(
530 buffer: &[u8],
531 body_length: BodyLength,
532 config: &BodyConfig,
533) -> Result<Option<Vec<u8>>, BodyError> {
534 let (body, _) = parse_body_with_consumed(buffer, body_length, config)?;
535 Ok(body)
536}
537
538pub fn parse_body_with_consumed(
549 buffer: &[u8],
550 body_length: BodyLength,
551 config: &BodyConfig,
552) -> Result<(Option<Vec<u8>>, usize), BodyError> {
553 match body_length {
554 BodyLength::None => Ok((None, 0)),
555 BodyLength::ContentLength(len) => {
556 if len == 0 {
557 return Ok((Some(Vec::new()), 0));
558 }
559 let mut reader = ContentLengthReader::new(buffer, len, config)?;
560 let body = reader.read_all()?;
561 Ok((Some(body), len))
562 }
563 BodyLength::Chunked => {
564 let mut reader = ChunkedReader::new(buffer, config);
565 let body = reader.decode_all()?;
566 Ok((Some(body), reader.bytes_consumed()))
567 }
568 BodyLength::Conflicting => Err(BodyError::InvalidChunkedEncoding {
569 detail: "conflicting body length indicators",
570 }),
571 }
572}
573
574pub fn validate_content_length(
587 content_length: usize,
588 config: &BodyConfig,
589) -> Result<(), BodyError> {
590 if content_length > config.max_size {
591 return Err(BodyError::TooLarge {
592 size: content_length,
593 max: config.max_size,
594 });
595 }
596 Ok(())
597}
598
599use asupersync::io::AsyncRead;
604use asupersync::stream::Stream;
605use fastapi_core::RequestBodyStreamError;
606use std::pin::Pin;
607use std::task::{Context, Poll};
608
609pub const DEFAULT_STREAMING_THRESHOLD: usize = 64 * 1024;
613
614#[derive(Debug, Clone)]
616pub struct StreamingBodyConfig {
617 pub streaming_threshold: usize,
619 pub chunk_size: usize,
621 pub max_size: usize,
623}
624
625impl Default for StreamingBodyConfig {
626 fn default() -> Self {
627 Self {
628 streaming_threshold: DEFAULT_STREAMING_THRESHOLD,
629 chunk_size: 8 * 1024, max_size: DEFAULT_MAX_BODY_SIZE,
631 }
632 }
633}
634
635impl StreamingBodyConfig {
636 #[must_use]
638 pub fn new() -> Self {
639 Self::default()
640 }
641
642 #[must_use]
644 pub fn with_streaming_threshold(mut self, threshold: usize) -> Self {
645 self.streaming_threshold = threshold;
646 self
647 }
648
649 #[must_use]
654 pub fn with_chunk_size(mut self, size: usize) -> Self {
655 self.chunk_size = size.max(1); self
657 }
658
659 #[must_use]
661 pub fn with_max_size(mut self, size: usize) -> Self {
662 self.max_size = size;
663 self
664 }
665
666 #[must_use]
668 pub fn should_stream(&self, content_length: usize) -> bool {
669 content_length > self.streaming_threshold
670 }
671}
672
673pub struct AsyncContentLengthStream<R> {
684 reader: Option<R>,
686 initial_buffer: Vec<u8>,
688 initial_position: usize,
690 expected_size: usize,
692 bytes_read: usize,
694 chunk_size: usize,
696 max_size: usize,
698 read_buffer: Vec<u8>,
700 complete: bool,
702 error: bool,
704}
705
706impl<R> AsyncContentLengthStream<R>
707where
708 R: AsyncRead + Unpin + Send + Sync + 'static,
709{
710 pub fn new(
719 initial_buffer: Vec<u8>,
720 reader: R,
721 content_length: usize,
722 config: &StreamingBodyConfig,
723 ) -> Self {
724 Self {
725 reader: Some(reader),
726 initial_buffer,
727 initial_position: 0,
728 expected_size: content_length,
729 bytes_read: 0,
730 chunk_size: config.chunk_size,
731 max_size: config.max_size,
732 read_buffer: vec![0u8; config.chunk_size],
733 complete: false,
734 error: false,
735 }
736 }
737
738 pub fn with_defaults(initial_buffer: Vec<u8>, reader: R, content_length: usize) -> Self {
740 Self::new(
741 initial_buffer,
742 reader,
743 content_length,
744 &StreamingBodyConfig::default(),
745 )
746 }
747
748 #[must_use]
750 pub fn expected_size(&self) -> usize {
751 self.expected_size
752 }
753
754 #[must_use]
756 pub fn bytes_read(&self) -> usize {
757 self.bytes_read
758 }
759
760 #[must_use]
762 pub fn remaining(&self) -> usize {
763 self.expected_size.saturating_sub(self.bytes_read)
764 }
765
766 #[must_use]
768 pub fn is_complete(&self) -> bool {
769 self.complete
770 }
771
772 fn initial_remaining(&self) -> usize {
773 self.initial_buffer
774 .len()
775 .saturating_sub(self.initial_position)
776 }
777}
778
779impl<R> Stream for AsyncContentLengthStream<R>
780where
781 R: AsyncRead + Unpin + Send + Sync + 'static,
782{
783 type Item = Result<Vec<u8>, RequestBodyStreamError>;
784
785 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
786 if self.complete || self.error {
788 return Poll::Ready(None);
789 }
790
791 if self.bytes_read >= self.expected_size {
793 self.complete = true;
794 return Poll::Ready(None);
795 }
796
797 if self.bytes_read > self.max_size {
799 self.error = true;
800 let bytes_read = self.bytes_read;
801 let max_size = self.max_size;
802 return Poll::Ready(Some(Err(RequestBodyStreamError::TooLarge {
803 received: bytes_read,
804 max: max_size,
805 })));
806 }
807
808 let initial_remaining = self.initial_remaining();
810 if initial_remaining > 0 {
811 let remaining_for_body = self.expected_size.saturating_sub(self.bytes_read);
812 let chunk_size = self
813 .chunk_size
814 .min(initial_remaining)
815 .min(remaining_for_body);
816
817 if chunk_size > 0 {
818 let start = self.initial_position;
819 let chunk = self.initial_buffer[start..start + chunk_size].to_vec();
820 self.initial_position += chunk_size;
821 self.bytes_read += chunk_size;
822 return Poll::Ready(Some(Ok(chunk)));
823 }
824 }
825
826 let remaining = self.expected_size.saturating_sub(self.bytes_read);
828 let to_read = self.chunk_size.min(remaining);
829
830 if to_read == 0 {
831 self.complete = true;
832 return Poll::Ready(None);
833 }
834
835 if self.read_buffer.len() < to_read {
837 self.read_buffer.resize(to_read, 0);
838 }
839
840 let mut reader = match self.reader.take() {
842 Some(r) => r,
843 None => {
844 self.error = true;
845 return Poll::Ready(Some(Err(RequestBodyStreamError::ConnectionClosed)));
846 }
847 };
848
849 let read_result = {
851 let mut read_buf = asupersync::io::ReadBuf::new(&mut self.read_buffer[..to_read]);
852 match Pin::new(&mut reader).poll_read(cx, &mut read_buf) {
853 Poll::Ready(Ok(())) => {
854 let n = read_buf.filled().len();
855 let chunk = read_buf.filled().to_vec();
856 Poll::Ready(Ok((n, chunk)))
857 }
858 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
859 Poll::Pending => Poll::Pending,
860 }
861 };
862
863 match read_result {
864 Poll::Ready(Ok((n, chunk))) => {
865 if n == 0 {
866 self.error = true;
868 return Poll::Ready(Some(Err(RequestBodyStreamError::ConnectionClosed)));
869 }
870
871 self.bytes_read += n;
872
873 self.reader = Some(reader);
875
876 Poll::Ready(Some(Ok(chunk)))
877 }
878 Poll::Ready(Err(e)) => {
879 self.error = true;
880 Poll::Ready(Some(Err(RequestBodyStreamError::Io(e.to_string()))))
881 }
882 Poll::Pending => {
883 self.reader = Some(reader);
885 Poll::Pending
886 }
887 }
888 }
889}
890
891#[derive(Debug, Clone, Copy, PartialEq, Eq)]
893enum AsyncChunkedState {
894 ChunkSize,
896 ChunkData { remaining: usize },
898 ChunkDataEnd,
900 Complete,
902 Error,
904}
905
906pub struct AsyncChunkedStream<R> {
921 #[allow(dead_code)]
923 reader: Option<R>,
924 state: AsyncChunkedState,
926 bytes_decoded: usize,
928 max_size: usize,
930 chunk_size: usize,
932 #[allow(dead_code)]
934 read_buffer: Vec<u8>,
935 buffer: Vec<u8>,
937 position: usize,
939}
940
941impl<R> AsyncChunkedStream<R>
942where
943 R: AsyncRead + Unpin + Send + Sync + 'static,
944{
945 pub fn new(initial_buffer: Vec<u8>, reader: R, config: &StreamingBodyConfig) -> Self {
958 assert!(
959 initial_buffer.len() <= config.max_size,
960 "initial buffer size {} exceeds max size {}",
961 initial_buffer.len(),
962 config.max_size
963 );
964 Self {
965 reader: Some(reader),
966 state: AsyncChunkedState::ChunkSize,
967 bytes_decoded: 0,
968 max_size: config.max_size,
969 chunk_size: config.chunk_size,
970 read_buffer: vec![0u8; config.chunk_size],
971 buffer: initial_buffer,
972 position: 0,
973 }
974 }
975
976 pub fn try_new(
982 initial_buffer: Vec<u8>,
983 reader: R,
984 config: &StreamingBodyConfig,
985 ) -> Result<Self, RequestBodyStreamError> {
986 if initial_buffer.len() > config.max_size {
987 return Err(RequestBodyStreamError::Io(format!(
988 "initial buffer size {} exceeds max size {}",
989 initial_buffer.len(),
990 config.max_size
991 )));
992 }
993 Ok(Self {
994 reader: Some(reader),
995 state: AsyncChunkedState::ChunkSize,
996 bytes_decoded: 0,
997 max_size: config.max_size,
998 chunk_size: config.chunk_size,
999 read_buffer: vec![0u8; config.chunk_size],
1000 buffer: initial_buffer,
1001 position: 0,
1002 })
1003 }
1004
1005 pub fn with_defaults(initial_buffer: Vec<u8>, reader: R) -> Self {
1007 Self::new(initial_buffer, reader, &StreamingBodyConfig::default())
1008 }
1009
1010 #[must_use]
1012 pub fn bytes_decoded(&self) -> usize {
1013 self.bytes_decoded
1014 }
1015
1016 #[must_use]
1018 pub fn is_complete(&self) -> bool {
1019 self.state == AsyncChunkedState::Complete
1020 }
1021
1022 fn buffer_remaining(&self) -> &[u8] {
1024 &self.buffer[self.position..]
1025 }
1026
1027 fn consume(&mut self, n: usize) {
1029 self.position += n;
1030 }
1031}
1032
1033impl<R> Stream for AsyncChunkedStream<R>
1034where
1035 R: AsyncRead + Unpin + Send + Sync + 'static,
1036{
1037 type Item = Result<Vec<u8>, RequestBodyStreamError>;
1038
1039 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1040 if self.state == AsyncChunkedState::Complete || self.state == AsyncChunkedState::Error {
1042 return Poll::Ready(None);
1043 }
1044
1045 if self.bytes_decoded > self.max_size {
1047 self.state = AsyncChunkedState::Error;
1048 let bytes_decoded = self.bytes_decoded;
1049 let max_size = self.max_size;
1050 return Poll::Ready(Some(Err(RequestBodyStreamError::TooLarge {
1051 received: bytes_decoded,
1052 max: max_size,
1053 })));
1054 }
1055
1056 loop {
1057 match self.state {
1058 AsyncChunkedState::ChunkSize => {
1059 let remaining = self.buffer_remaining();
1061 if let Some(crlf_pos) = remaining.windows(2).position(|w| w == b"\r\n") {
1062 let size_line = &remaining[..crlf_pos];
1064
1065 let size_str = if let Some(semi) = size_line.iter().position(|&b| b == b';')
1067 {
1068 &size_line[..semi]
1069 } else {
1070 size_line
1071 };
1072
1073 let size_str = match std::str::from_utf8(size_str) {
1074 Ok(s) => s.trim(),
1075 Err(_) => {
1076 self.state = AsyncChunkedState::Error;
1077 return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
1078 "invalid UTF-8 in chunk size".to_string(),
1079 ))));
1080 }
1081 };
1082
1083 let chunk_size = match usize::from_str_radix(size_str, 16) {
1084 Ok(s) => s,
1085 Err(_) => {
1086 self.state = AsyncChunkedState::Error;
1087 return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
1088 "invalid hex chunk size".to_string(),
1089 ))));
1090 }
1091 };
1092
1093 const MAX_SINGLE_CHUNK: usize = 16 * 1024 * 1024;
1095 if chunk_size > MAX_SINGLE_CHUNK {
1096 self.state = AsyncChunkedState::Error;
1097 return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
1098 "chunk size exceeds 16MB limit".to_string(),
1099 ))));
1100 }
1101
1102 self.consume(crlf_pos + 2);
1103
1104 if chunk_size == 0 {
1105 self.state = AsyncChunkedState::Complete;
1107 return Poll::Ready(None);
1108 }
1109
1110 self.state = AsyncChunkedState::ChunkData {
1111 remaining: chunk_size,
1112 };
1113 continue;
1114 }
1115
1116 self.state = AsyncChunkedState::Error;
1118 return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
1119 "incomplete chunk size line".to_string(),
1120 ))));
1121 }
1122 AsyncChunkedState::ChunkData { remaining } => {
1123 let buffer_remaining = self.buffer_remaining();
1125 let to_read = remaining.min(buffer_remaining.len()).min(self.chunk_size);
1126
1127 if to_read > 0 {
1128 let chunk = buffer_remaining[..to_read].to_vec();
1129 self.consume(to_read);
1130 self.bytes_decoded += to_read;
1131
1132 let new_remaining = remaining - to_read;
1133 if new_remaining == 0 {
1134 self.state = AsyncChunkedState::ChunkDataEnd;
1135 } else {
1136 self.state = AsyncChunkedState::ChunkData {
1137 remaining: new_remaining,
1138 };
1139 }
1140
1141 return Poll::Ready(Some(Ok(chunk)));
1142 }
1143
1144 self.state = AsyncChunkedState::Error;
1146 return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
1147 "incomplete chunk data".to_string(),
1148 ))));
1149 }
1150 AsyncChunkedState::ChunkDataEnd => {
1151 let remaining = self.buffer_remaining();
1153 if remaining.len() >= 2 {
1154 if &remaining[..2] == b"\r\n" {
1155 self.consume(2);
1156 self.state = AsyncChunkedState::ChunkSize;
1157 continue;
1158 }
1159 self.state = AsyncChunkedState::Error;
1160 return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
1161 "expected CRLF after chunk data".to_string(),
1162 ))));
1163 }
1164
1165 self.state = AsyncChunkedState::Error;
1167 return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
1168 "incomplete CRLF after chunk".to_string(),
1169 ))));
1170 }
1171 AsyncChunkedState::Complete | AsyncChunkedState::Error => {
1172 return Poll::Ready(None);
1173 }
1174 }
1175 }
1176 }
1177}
1178
1179pub fn create_content_length_stream<R>(
1191 initial_buffer: Vec<u8>,
1192 reader: R,
1193 content_length: usize,
1194 config: &StreamingBodyConfig,
1195) -> fastapi_core::Body
1196where
1197 R: AsyncRead + Unpin + Send + Sync + 'static,
1198{
1199 let stream = AsyncContentLengthStream::new(initial_buffer, reader, content_length, config);
1200 fastapi_core::Body::streaming_with_size(stream, content_length)
1201}
1202
1203pub fn create_chunked_stream<R>(
1213 initial_buffer: Vec<u8>,
1214 reader: R,
1215 config: &StreamingBodyConfig,
1216) -> fastapi_core::Body
1217where
1218 R: AsyncRead + Unpin + Send + Sync + 'static,
1219{
1220 let stream = AsyncChunkedStream::new(initial_buffer, reader, config);
1221 fastapi_core::Body::streaming(stream)
1222}
1223
1224#[cfg(test)]
1229mod tests {
1230 use super::*;
1231
1232 #[test]
1237 fn body_config_defaults() {
1238 let config = BodyConfig::default();
1239 assert_eq!(config.max_size(), DEFAULT_MAX_BODY_SIZE);
1240 assert_eq!(config.initial_capacity(), 4096);
1241 }
1242
1243 #[test]
1244 fn body_config_custom() {
1245 let config = BodyConfig::new()
1246 .with_max_size(2048)
1247 .with_initial_capacity(1024);
1248 assert_eq!(config.max_size(), 2048);
1249 assert_eq!(config.initial_capacity(), 1024);
1250 }
1251
1252 #[test]
1257 fn content_length_basic() {
1258 let body = b"Hello, World!";
1259 let config = BodyConfig::default();
1260 let mut reader = ContentLengthReader::new(body, body.len(), &config).unwrap();
1261
1262 assert_eq!(reader.length(), 13);
1263 assert_eq!(reader.remaining(), 13);
1264 assert!(!reader.is_complete());
1265
1266 let result = reader.read_all().unwrap();
1267 assert_eq!(result, b"Hello, World!");
1268 assert!(reader.is_complete());
1269 }
1270
1271 #[test]
1272 fn content_length_zero() {
1273 let body = b"";
1274 let config = BodyConfig::default();
1275 let mut reader = ContentLengthReader::new(body, 0, &config).unwrap();
1276
1277 assert_eq!(reader.length(), 0);
1278 assert!(reader.is_complete());
1279
1280 let result = reader.read_all().unwrap();
1281 assert!(result.is_empty());
1282 }
1283
1284 #[test]
1285 fn content_length_too_large() {
1286 let body = b"small";
1287 let config = BodyConfig::new().with_max_size(3);
1288 let result = ContentLengthReader::new(body, 100, &config);
1289
1290 assert!(matches!(
1291 result,
1292 Err(BodyError::TooLarge { size: 100, max: 3 })
1293 ));
1294 }
1295
1296 #[test]
1297 fn content_length_incomplete() {
1298 let body = b"Hello";
1299 let config = BodyConfig::default();
1300 let mut reader = ContentLengthReader::new(body, 10, &config).unwrap();
1301
1302 let result = reader.read_all();
1303 assert!(matches!(
1304 result,
1305 Err(BodyError::Incomplete {
1306 received: 5,
1307 expected: Some(10)
1308 })
1309 ));
1310 }
1311
1312 #[test]
1313 fn content_length_borrowed() {
1314 let body = b"Hello, World!";
1315 let config = BodyConfig::default();
1316 let reader = ContentLengthReader::new(body, body.len(), &config).unwrap();
1317
1318 let borrowed = reader.read_all_borrowed().unwrap();
1319 assert_eq!(borrowed, body);
1320 assert_eq!(borrowed.as_ptr(), body.as_ptr());
1322 }
1323
1324 #[test]
1325 fn content_length_incremental_read() {
1326 let body = b"Hello, World!";
1327 let config = BodyConfig::default();
1328 let mut reader = ContentLengthReader::new(body, body.len(), &config).unwrap();
1329
1330 let mut buf = [0u8; 5];
1331
1332 let n = reader.read(&mut buf).unwrap();
1334 assert_eq!(n, 5);
1335 assert_eq!(&buf[..n], b"Hello");
1336 assert_eq!(reader.remaining(), 8);
1337
1338 let n = reader.read(&mut buf).unwrap();
1340 assert_eq!(n, 5);
1341 assert_eq!(&buf[..n], b", Wor");
1342 assert_eq!(reader.remaining(), 3);
1343
1344 let n = reader.read(&mut buf).unwrap();
1346 assert_eq!(n, 3);
1347 assert_eq!(&buf[..n], b"ld!");
1348 assert!(reader.is_complete());
1349
1350 let n = reader.read(&mut buf).unwrap();
1352 assert_eq!(n, 0);
1353 }
1354
1355 #[test]
1360 fn chunked_single_chunk() {
1361 let body = b"5\r\nHello\r\n0\r\n\r\n";
1362 let config = BodyConfig::default();
1363 let mut reader = ChunkedReader::new(body, &config);
1364
1365 let result = reader.decode_all().unwrap();
1366 assert_eq!(result, b"Hello");
1367 assert!(reader.is_complete());
1368 }
1369
1370 #[test]
1371 fn chunked_multiple_chunks() {
1372 let body = b"5\r\nHello\r\n7\r\n, World\r\n1\r\n!\r\n0\r\n\r\n";
1373 let config = BodyConfig::default();
1374 let mut reader = ChunkedReader::new(body, &config);
1375
1376 let result = reader.decode_all().unwrap();
1377 assert_eq!(result, b"Hello, World!");
1378 assert!(reader.is_complete());
1379 }
1380
1381 #[test]
1382 fn chunked_empty() {
1383 let body = b"0\r\n\r\n";
1384 let config = BodyConfig::default();
1385 let mut reader = ChunkedReader::new(body, &config);
1386
1387 let result = reader.decode_all().unwrap();
1388 assert!(result.is_empty());
1389 assert!(reader.is_complete());
1390 }
1391
1392 #[test]
1393 fn chunked_with_extension() {
1394 let body = b"5;ext=value\r\nHello\r\n0\r\n\r\n";
1396 let config = BodyConfig::default();
1397 let mut reader = ChunkedReader::new(body, &config);
1398
1399 let result = reader.decode_all().unwrap();
1400 assert_eq!(result, b"Hello");
1401 }
1402
1403 #[test]
1404 fn chunked_with_trailers() {
1405 let body = b"5\r\nHello\r\n0\r\nTrailer: value\r\n\r\n";
1406 let config = BodyConfig::default();
1407 let mut reader = ChunkedReader::new(body, &config);
1408
1409 let result = reader.decode_all().unwrap();
1410 assert_eq!(result, b"Hello");
1411 assert!(reader.is_complete());
1412 }
1413
1414 #[test]
1415 fn chunked_hex_sizes() {
1416 let body = b"a\r\n0123456789\r\nF\r\n0123456789ABCDE\r\n0\r\n\r\n";
1418 let config = BodyConfig::default();
1419 let mut reader = ChunkedReader::new(body, &config);
1420
1421 let result = reader.decode_all().unwrap();
1422 assert_eq!(result.len(), 10 + 15); }
1424
1425 #[test]
1426 fn chunked_too_large() {
1427 let body = b"10\r\n0123456789ABCDEF\r\n0\r\n\r\n"; let config = BodyConfig::new().with_max_size(10);
1429 let mut reader = ChunkedReader::new(body, &config);
1430
1431 let result = reader.decode_all();
1432 assert!(matches!(
1433 result,
1434 Err(BodyError::TooLarge { size: 16, max: 10 })
1435 ));
1436 }
1437
1438 #[test]
1439 fn chunked_invalid_size() {
1440 let body = b"xyz\r\nHello\r\n0\r\n\r\n";
1441 let config = BodyConfig::default();
1442 let mut reader = ChunkedReader::new(body, &config);
1443
1444 let result = reader.decode_all();
1445 assert!(matches!(
1446 result,
1447 Err(BodyError::InvalidChunkedEncoding { detail: _ })
1448 ));
1449 }
1450
1451 #[test]
1452 fn chunked_missing_crlf() {
1453 let body = b"5\r\nHelloX0\r\n\r\n"; let config = BodyConfig::default();
1455 let mut reader = ChunkedReader::new(body, &config);
1456
1457 let result = reader.decode_all();
1458 assert!(matches!(
1459 result,
1460 Err(BodyError::InvalidChunkedEncoding {
1461 detail: "expected CRLF after chunk data"
1462 })
1463 ));
1464 }
1465
1466 #[test]
1467 fn chunked_incomplete() {
1468 let body = b"5\r\nHel"; let config = BodyConfig::default();
1470 let mut reader = ChunkedReader::new(body, &config);
1471
1472 let result = reader.decode_all();
1473 assert!(matches!(result, Err(BodyError::Incomplete { .. })));
1474 }
1475
1476 #[test]
1481 fn parse_body_none() {
1482 let config = BodyConfig::default();
1483 let result = parse_body(b"ignored", BodyLength::None, &config).unwrap();
1484 assert!(result.is_none());
1485 }
1486
1487 #[test]
1488 fn parse_body_content_length() {
1489 let config = BodyConfig::default();
1490 let result = parse_body(b"Hello, World!", BodyLength::ContentLength(13), &config).unwrap();
1491 assert_eq!(result.unwrap(), b"Hello, World!");
1492 }
1493
1494 #[test]
1495 fn parse_body_content_length_zero() {
1496 let config = BodyConfig::default();
1497 let result = parse_body(b"", BodyLength::ContentLength(0), &config).unwrap();
1498 assert_eq!(result.unwrap(), b"");
1499 }
1500
1501 #[test]
1502 fn parse_body_chunked() {
1503 let config = BodyConfig::default();
1504 let result = parse_body(b"5\r\nHello\r\n0\r\n\r\n", BodyLength::Chunked, &config).unwrap();
1505 assert_eq!(result.unwrap(), b"Hello");
1506 }
1507
1508 #[test]
1509 fn parse_body_with_consumed_content_length() {
1510 let config = BodyConfig::default();
1511 let (body, consumed) =
1512 parse_body_with_consumed(b"Hello, World!", BodyLength::ContentLength(13), &config)
1513 .unwrap();
1514 assert_eq!(body.unwrap(), b"Hello, World!");
1515 assert_eq!(consumed, 13);
1516 }
1517
1518 #[test]
1519 fn parse_body_with_consumed_chunked() {
1520 let config = BodyConfig::default();
1521 let (body, consumed) =
1522 parse_body_with_consumed(b"5\r\nHello\r\n0\r\n\r\n", BodyLength::Chunked, &config)
1523 .unwrap();
1524 assert_eq!(body.unwrap(), b"Hello");
1525 assert_eq!(consumed, 15);
1526 }
1527
1528 #[test]
1533 fn validate_content_length_ok() {
1534 let config = BodyConfig::new().with_max_size(1000);
1535 assert!(validate_content_length(500, &config).is_ok());
1536 assert!(validate_content_length(1000, &config).is_ok());
1537 }
1538
1539 #[test]
1540 fn validate_content_length_too_large() {
1541 let config = BodyConfig::new().with_max_size(1000);
1542 let result = validate_content_length(1001, &config);
1543 assert!(matches!(
1544 result,
1545 Err(BodyError::TooLarge {
1546 size: 1001,
1547 max: 1000
1548 })
1549 ));
1550 }
1551
1552 #[test]
1557 fn body_error_display() {
1558 let err = BodyError::TooLarge {
1559 size: 2000,
1560 max: 1000,
1561 };
1562 assert_eq!(
1563 format!("{err}"),
1564 "body too large: 2000 bytes exceeds limit of 1000"
1565 );
1566
1567 let err = BodyError::InvalidChunkedEncoding {
1568 detail: "bad format",
1569 };
1570 assert_eq!(format!("{err}"), "invalid chunked encoding: bad format");
1571
1572 let err = BodyError::Incomplete {
1573 received: 50,
1574 expected: Some(100),
1575 };
1576 assert_eq!(
1577 format!("{err}"),
1578 "incomplete body: received 50 of 100 bytes"
1579 );
1580
1581 let err = BodyError::UnexpectedEof;
1582 assert_eq!(format!("{err}"), "unexpected end of body");
1583 }
1584
1585 #[test]
1590 fn streaming_body_config_defaults() {
1591 let config = StreamingBodyConfig::default();
1592 assert_eq!(config.streaming_threshold, DEFAULT_STREAMING_THRESHOLD);
1593 assert_eq!(config.chunk_size, 8 * 1024);
1594 assert_eq!(config.max_size, DEFAULT_MAX_BODY_SIZE);
1595 }
1596
1597 #[test]
1598 fn streaming_body_config_custom() {
1599 let config = StreamingBodyConfig::new()
1600 .with_streaming_threshold(1024)
1601 .with_chunk_size(4096)
1602 .with_max_size(10_000);
1603 assert_eq!(config.streaming_threshold, 1024);
1604 assert_eq!(config.chunk_size, 4096);
1605 assert_eq!(config.max_size, 10_000);
1606 }
1607
1608 #[test]
1609 fn streaming_body_config_minimum_chunk_size() {
1610 let config = StreamingBodyConfig::new().with_chunk_size(0);
1611 assert_eq!(config.chunk_size, 1);
1613 }
1614
1615 #[test]
1616 fn streaming_body_config_should_stream() {
1617 let config = StreamingBodyConfig::new().with_streaming_threshold(1000);
1618 assert!(!config.should_stream(500));
1619 assert!(!config.should_stream(1000));
1620 assert!(config.should_stream(1001));
1621 assert!(config.should_stream(10000));
1622 }
1623
1624 #[test]
1629 fn async_content_length_stream_from_buffer() {
1630 use std::sync::Arc;
1631 use std::task::{Wake, Waker};
1632
1633 struct NoopWaker;
1634 impl Wake for NoopWaker {
1635 fn wake(self: Arc<Self>) {}
1636 }
1637
1638 fn noop_waker() -> Waker {
1639 Waker::from(Arc::new(NoopWaker))
1640 }
1641
1642 struct EmptyReader;
1644 impl AsyncRead for EmptyReader {
1645 fn poll_read(
1646 self: Pin<&mut Self>,
1647 _cx: &mut Context<'_>,
1648 _buf: &mut asupersync::io::ReadBuf<'_>,
1649 ) -> Poll<std::io::Result<()>> {
1650 Poll::Ready(Ok(()))
1651 }
1652 }
1653
1654 let buffer = b"Hello, World!".to_vec();
1655 let config = StreamingBodyConfig::new().with_chunk_size(5);
1656 let mut stream = AsyncContentLengthStream::new(buffer, EmptyReader, 13, &config);
1657
1658 assert_eq!(stream.expected_size(), 13);
1659 assert_eq!(stream.bytes_read(), 0);
1660 assert_eq!(stream.remaining(), 13);
1661
1662 let waker = noop_waker();
1663 let mut cx = Context::from_waker(&waker);
1664
1665 let result = Pin::new(&mut stream).poll_next(&mut cx);
1667 match result {
1668 Poll::Ready(Some(Ok(chunk))) => {
1669 assert_eq!(chunk, b"Hello");
1670 }
1671 _ => panic!("expected chunk"),
1672 }
1673 assert_eq!(stream.bytes_read(), 5);
1674
1675 let result = Pin::new(&mut stream).poll_next(&mut cx);
1677 match result {
1678 Poll::Ready(Some(Ok(chunk))) => {
1679 assert_eq!(chunk, b", Wor");
1680 }
1681 _ => panic!("expected chunk"),
1682 }
1683
1684 let result = Pin::new(&mut stream).poll_next(&mut cx);
1686 match result {
1687 Poll::Ready(Some(Ok(chunk))) => {
1688 assert_eq!(chunk, b"ld!");
1689 }
1690 _ => panic!("expected chunk"),
1691 }
1692
1693 let result = Pin::new(&mut stream).poll_next(&mut cx);
1695 assert!(matches!(result, Poll::Ready(None)));
1696 assert!(stream.is_complete());
1697 }
1698
1699 #[test]
1704 fn async_chunked_stream_simple() {
1705 use std::sync::Arc;
1706 use std::task::{Wake, Waker};
1707
1708 struct NoopWaker;
1709 impl Wake for NoopWaker {
1710 fn wake(self: Arc<Self>) {}
1711 }
1712
1713 fn noop_waker() -> Waker {
1714 Waker::from(Arc::new(NoopWaker))
1715 }
1716
1717 struct EmptyReader;
1718 impl AsyncRead for EmptyReader {
1719 fn poll_read(
1720 self: Pin<&mut Self>,
1721 _cx: &mut Context<'_>,
1722 _buf: &mut asupersync::io::ReadBuf<'_>,
1723 ) -> Poll<std::io::Result<()>> {
1724 Poll::Ready(Ok(()))
1725 }
1726 }
1727
1728 let buffer = b"5\r\nHello\r\n0\r\n\r\n".to_vec();
1730 let config = StreamingBodyConfig::new().with_chunk_size(1024);
1731 let mut stream = AsyncChunkedStream::new(buffer, EmptyReader, &config);
1732
1733 let waker = noop_waker();
1734 let mut cx = Context::from_waker(&waker);
1735
1736 let result = Pin::new(&mut stream).poll_next(&mut cx);
1738 match result {
1739 Poll::Ready(Some(Ok(chunk))) => {
1740 assert_eq!(chunk, b"Hello");
1741 }
1742 _ => panic!("expected chunk, got {:?}", result),
1743 }
1744
1745 let result = Pin::new(&mut stream).poll_next(&mut cx);
1748 assert!(matches!(result, Poll::Ready(None)));
1750 assert!(stream.is_complete());
1751 }
1752
1753 #[test]
1754 fn async_chunked_stream_multiple_chunks() {
1755 use std::sync::Arc;
1756 use std::task::{Wake, Waker};
1757
1758 struct NoopWaker;
1759 impl Wake for NoopWaker {
1760 fn wake(self: Arc<Self>) {}
1761 }
1762
1763 fn noop_waker() -> Waker {
1764 Waker::from(Arc::new(NoopWaker))
1765 }
1766
1767 struct EmptyReader;
1768 impl AsyncRead for EmptyReader {
1769 fn poll_read(
1770 self: Pin<&mut Self>,
1771 _cx: &mut Context<'_>,
1772 _buf: &mut asupersync::io::ReadBuf<'_>,
1773 ) -> Poll<std::io::Result<()>> {
1774 Poll::Ready(Ok(()))
1775 }
1776 }
1777
1778 let buffer = b"5\r\nHello\r\n8\r\n, World!\r\n0\r\n\r\n".to_vec();
1780 let config = StreamingBodyConfig::new();
1781 let mut stream = AsyncChunkedStream::new(buffer, EmptyReader, &config);
1782
1783 let waker = noop_waker();
1784 let mut cx = Context::from_waker(&waker);
1785
1786 let mut collected = Vec::new();
1788 loop {
1789 match Pin::new(&mut stream).poll_next(&mut cx) {
1790 Poll::Ready(Some(Ok(chunk))) => collected.extend_from_slice(&chunk),
1791 Poll::Ready(Some(Err(e))) => panic!("unexpected error: {e}"),
1792 Poll::Ready(None) => break,
1793 Poll::Pending => {} }
1795 }
1796
1797 assert_eq!(collected, b"Hello, World!");
1798 }
1799}