1use std::any::{Any, TypeId};
4use std::collections::HashMap;
5use std::fmt;
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
9pub enum HttpVersion {
10 Http10,
12 #[default]
14 Http11,
15}
16
17impl HttpVersion {
18 #[must_use]
20 pub fn parse(s: &str) -> Option<Self> {
21 match s {
22 "HTTP/1.0" => Some(Self::Http10),
23 "HTTP/1.1" => Some(Self::Http11),
24 _ => None,
25 }
26 }
27
28 #[must_use]
30 pub fn is_http11(self) -> bool {
31 matches!(self, Self::Http11)
32 }
33
34 #[must_use]
36 pub fn is_http10(self) -> bool {
37 matches!(self, Self::Http10)
38 }
39
40 #[must_use]
42 pub const fn as_str(self) -> &'static str {
43 match self {
44 Self::Http10 => "HTTP/1.0",
45 Self::Http11 => "HTTP/1.1",
46 }
47 }
48}
49
50impl std::str::FromStr for HttpVersion {
51 type Err = ();
52
53 fn from_str(s: &str) -> Result<Self, Self::Err> {
54 Self::parse(s).ok_or(())
55 }
56}
57
58impl fmt::Display for HttpVersion {
59 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60 f.write_str(self.as_str())
61 }
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
66pub enum Method {
67 Get,
69 Post,
71 Put,
73 Delete,
75 Patch,
77 Options,
79 Head,
81 Trace,
83}
84
85impl Method {
86 #[must_use]
88 pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
89 match bytes {
90 b"GET" => Some(Self::Get),
91 b"POST" => Some(Self::Post),
92 b"PUT" => Some(Self::Put),
93 b"DELETE" => Some(Self::Delete),
94 b"PATCH" => Some(Self::Patch),
95 b"OPTIONS" => Some(Self::Options),
96 b"HEAD" => Some(Self::Head),
97 b"TRACE" => Some(Self::Trace),
98 _ => None,
99 }
100 }
101
102 #[must_use]
104 pub const fn as_str(self) -> &'static str {
105 match self {
106 Self::Get => "GET",
107 Self::Post => "POST",
108 Self::Put => "PUT",
109 Self::Delete => "DELETE",
110 Self::Patch => "PATCH",
111 Self::Options => "OPTIONS",
112 Self::Head => "HEAD",
113 Self::Trace => "TRACE",
114 }
115 }
116}
117
118impl fmt::Display for Method {
119 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
120 f.write_str(self.as_str())
121 }
122}
123
124#[derive(Debug, Default)]
129pub struct Headers {
130 inner: HashMap<String, Vec<u8>>,
131}
132
133impl Headers {
134 #[must_use]
136 pub fn new() -> Self {
137 Self::default()
138 }
139
140 #[must_use]
144 pub fn get(&self, name: &str) -> Option<&[u8]> {
145 self.inner
146 .get(lowercase_header_key(name).as_ref())
147 .map(Vec::as_slice)
148 }
149
150 pub fn insert(&mut self, name: impl Into<String>, value: impl Into<Vec<u8>>) {
154 self.inner
155 .insert(name.into().to_ascii_lowercase(), value.into());
156 }
157
158 #[inline]
165 pub fn insert_from_slice(&mut self, name: &str, value: &[u8]) {
166 let name_owned = if name.bytes().any(|b| b.is_ascii_uppercase()) {
168 name.to_ascii_lowercase()
170 } else {
171 name.to_owned()
173 };
174 self.inner.insert(name_owned, value.to_vec());
175 }
176
177 #[inline]
185 pub fn insert_lowercase(&mut self, name: String, value: Vec<u8>) {
186 debug_assert!(
187 !name.bytes().any(|b| b.is_ascii_uppercase()),
188 "insert_lowercase called with non-lowercase name: {}",
189 name
190 );
191 self.inner.insert(name, value);
192 }
193
194 pub fn iter(&self) -> impl Iterator<Item = (&str, &[u8])> {
196 self.inner
197 .iter()
198 .map(|(name, value)| (name.as_str(), value.as_slice()))
199 }
200
201 #[must_use]
203 pub fn len(&self) -> usize {
204 self.inner.len()
205 }
206
207 #[must_use]
209 pub fn is_empty(&self) -> bool {
210 self.inner.is_empty()
211 }
212
213 pub fn remove(&mut self, name: &str) -> Option<Vec<u8>> {
217 self.inner.remove(lowercase_header_key(name).as_ref())
218 }
219
220 #[must_use]
222 pub fn contains(&self, name: &str) -> bool {
223 self.inner.contains_key(lowercase_header_key(name).as_ref())
224 }
225}
226
227#[inline]
236fn lowercase_header_key(name: &str) -> std::borrow::Cow<'_, str> {
237 let needs_lowercase = name.as_bytes().iter().any(|&b| b.is_ascii_uppercase());
240
241 if needs_lowercase {
242 std::borrow::Cow::Owned(name.to_ascii_lowercase())
243 } else {
244 std::borrow::Cow::Borrowed(name)
245 }
246}
247
248#[derive(Debug)]
250pub enum Body {
251 Empty,
253 Bytes(Vec<u8>),
255 Stream(RequestBodyStream),
262}
263
264#[derive(Debug)]
266pub enum RequestBodyStreamError {
267 ConnectionClosed,
269 Timeout,
271 TooLarge { received: usize, max: usize },
273 Io(String),
275}
276
277impl std::fmt::Display for RequestBodyStreamError {
278 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279 match self {
280 Self::ConnectionClosed => write!(f, "connection closed"),
281 Self::Timeout => write!(f, "timeout waiting for body data"),
282 Self::TooLarge { received, max } => {
283 write!(f, "body too large: {received} bytes exceeds limit of {max}")
284 }
285 Self::Io(msg) => write!(f, "I/O error: {msg}"),
286 }
287 }
288}
289
290impl std::error::Error for RequestBodyStreamError {}
291
292pub struct RequestBodyStream {
317 inner: std::pin::Pin<
319 Box<
320 dyn asupersync::stream::Stream<Item = Result<Vec<u8>, RequestBodyStreamError>>
321 + Send
322 + Sync,
323 >,
324 >,
325 bytes_received: usize,
327 expected_size: Option<usize>,
329 complete: bool,
331}
332
333impl std::fmt::Debug for RequestBodyStream {
334 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
335 f.debug_struct("RequestBodyStream")
336 .field("bytes_received", &self.bytes_received)
337 .field("expected_size", &self.expected_size)
338 .field("complete", &self.complete)
339 .finish_non_exhaustive()
340 }
341}
342
343impl RequestBodyStream {
344 pub fn new<S>(stream: S) -> Self
346 where
347 S: asupersync::stream::Stream<Item = Result<Vec<u8>, RequestBodyStreamError>>
348 + Send
349 + Sync
350 + 'static,
351 {
352 Self {
353 inner: Box::pin(stream),
354 bytes_received: 0,
355 expected_size: None,
356 complete: false,
357 }
358 }
359
360 pub fn with_expected_size<S>(stream: S, expected_size: usize) -> Self
362 where
363 S: asupersync::stream::Stream<Item = Result<Vec<u8>, RequestBodyStreamError>>
364 + Send
365 + Sync
366 + 'static,
367 {
368 Self {
369 inner: Box::pin(stream),
370 bytes_received: 0,
371 expected_size: Some(expected_size),
372 complete: false,
373 }
374 }
375
376 #[must_use]
378 pub fn bytes_received(&self) -> usize {
379 self.bytes_received
380 }
381
382 #[must_use]
384 pub fn expected_size(&self) -> Option<usize> {
385 self.expected_size
386 }
387
388 #[must_use]
390 pub fn is_complete(&self) -> bool {
391 self.complete
392 }
393
394 pub async fn collect(mut self) -> Result<Vec<u8>, RequestBodyStreamError> {
401 use asupersync::stream::StreamExt;
402
403 let capacity = self.expected_size.unwrap_or(4096);
404 let mut buffer = Vec::with_capacity(capacity);
405
406 while let Some(chunk) = self.inner.next().await {
407 buffer.extend_from_slice(&chunk?);
408 self.bytes_received = buffer.len();
409 }
410
411 self.complete = true;
412 Ok(buffer)
413 }
414}
415
416impl asupersync::stream::Stream for RequestBodyStream {
417 type Item = Result<Vec<u8>, RequestBodyStreamError>;
418
419 fn poll_next(
420 mut self: std::pin::Pin<&mut Self>,
421 cx: &mut std::task::Context<'_>,
422 ) -> std::task::Poll<Option<Self::Item>> {
423 if self.complete {
424 return std::task::Poll::Ready(None);
425 }
426
427 match self.inner.as_mut().poll_next(cx) {
428 std::task::Poll::Ready(Some(Ok(chunk))) => {
429 self.bytes_received += chunk.len();
430 std::task::Poll::Ready(Some(Ok(chunk)))
431 }
432 std::task::Poll::Ready(Some(Err(e))) => std::task::Poll::Ready(Some(Err(e))),
433 std::task::Poll::Ready(None) => {
434 self.complete = true;
435 std::task::Poll::Ready(None)
436 }
437 std::task::Poll::Pending => std::task::Poll::Pending,
438 }
439 }
440}
441
442impl Body {
443 #[must_use]
448 pub fn into_bytes(self) -> Vec<u8> {
449 match self {
450 Self::Empty => Vec::new(),
451 Self::Bytes(b) => b,
452 Self::Stream(_) => panic!(
453 "cannot synchronously convert streaming body to bytes; use into_bytes_async()"
454 ),
455 }
456 }
457
458 pub async fn into_bytes_async(self) -> Result<Vec<u8>, RequestBodyStreamError> {
469 match self {
470 Self::Empty => Ok(Vec::new()),
471 Self::Bytes(b) => Ok(b),
472 Self::Stream(stream) => stream.collect().await,
473 }
474 }
475
476 #[must_use]
478 pub fn is_empty(&self) -> bool {
479 match self {
480 Self::Empty => true,
481 Self::Bytes(b) => b.is_empty(),
482 Self::Stream(s) => s.is_complete() && s.bytes_received() == 0,
483 }
484 }
485
486 #[must_use]
488 pub fn is_streaming(&self) -> bool {
489 matches!(self, Self::Stream(_))
490 }
491
492 #[must_use]
496 pub fn take_stream(self) -> Option<RequestBodyStream> {
497 match self {
498 Self::Stream(s) => Some(s),
499 _ => None,
500 }
501 }
502
503 pub fn streaming<S>(stream: S) -> Self
505 where
506 S: asupersync::stream::Stream<Item = Result<Vec<u8>, RequestBodyStreamError>>
507 + Send
508 + Sync
509 + 'static,
510 {
511 Self::Stream(RequestBodyStream::new(stream))
512 }
513
514 pub fn streaming_with_size<S>(stream: S, size: usize) -> Self
516 where
517 S: asupersync::stream::Stream<Item = Result<Vec<u8>, RequestBodyStreamError>>
518 + Send
519 + Sync
520 + 'static,
521 {
522 Self::Stream(RequestBodyStream::with_expected_size(stream, size))
523 }
524}
525
526#[derive(Debug)]
528pub struct Request {
529 method: Method,
530 path: String,
531 query: Option<String>,
532 version: HttpVersion,
533 headers: Headers,
534 body: Body,
535 #[allow(dead_code)] extensions: HashMap<std::any::TypeId, Box<dyn std::any::Any + Send + Sync>>,
538}
539
540impl Request {
541 #[must_use]
543 pub fn new(method: Method, path: impl Into<String>) -> Self {
544 Self {
545 method,
546 path: path.into(),
547 query: None,
548 version: HttpVersion::default(),
549 headers: Headers::new(),
550 body: Body::Empty,
551 extensions: HashMap::new(),
552 }
553 }
554
555 #[must_use]
557 pub fn with_version(method: Method, path: impl Into<String>, version: HttpVersion) -> Self {
558 Self {
559 method,
560 path: path.into(),
561 query: None,
562 version,
563 headers: Headers::new(),
564 body: Body::Empty,
565 extensions: HashMap::new(),
566 }
567 }
568
569 #[must_use]
571 pub fn version(&self) -> HttpVersion {
572 self.version
573 }
574
575 pub fn set_version(&mut self, version: HttpVersion) {
577 self.version = version;
578 }
579
580 #[must_use]
582 pub fn method(&self) -> Method {
583 self.method
584 }
585
586 #[must_use]
588 pub fn path(&self) -> &str {
589 &self.path
590 }
591
592 pub fn set_path(&mut self, path: String) {
597 self.path = path;
598 }
599
600 #[must_use]
602 pub fn query(&self) -> Option<&str> {
603 self.query.as_deref()
604 }
605
606 #[must_use]
608 pub fn headers(&self) -> &Headers {
609 &self.headers
610 }
611
612 pub fn headers_mut(&mut self) -> &mut Headers {
614 &mut self.headers
615 }
616
617 #[must_use]
619 pub fn body(&self) -> &Body {
620 &self.body
621 }
622
623 pub fn take_body(&mut self) -> Body {
625 std::mem::replace(&mut self.body, Body::Empty)
626 }
627
628 pub fn set_body(&mut self, body: Body) {
630 self.body = body;
631 }
632
633 pub fn set_query(&mut self, query: Option<String>) {
635 self.query = query;
636 }
637
638 pub fn insert_extension<T: Any + Send + Sync>(&mut self, value: T) {
640 self.extensions.insert(TypeId::of::<T>(), Box::new(value));
641 }
642
643 #[must_use]
645 pub fn get_extension<T: Any + Send + Sync>(&self) -> Option<&T> {
646 self.extensions
647 .get(&TypeId::of::<T>())
648 .and_then(|boxed| boxed.downcast_ref::<T>())
649 }
650
651 pub fn get_extension_mut<T: Any + Send + Sync>(&mut self) -> Option<&mut T> {
653 self.extensions
654 .get_mut(&TypeId::of::<T>())
655 .and_then(|boxed| boxed.downcast_mut::<T>())
656 }
657}
658
659#[cfg(test)]
660mod tests {
661 use super::*;
662 use asupersync::stream::Stream;
663 use std::pin::Pin;
664 use std::sync::Arc;
665 use std::task::{Context, Poll, Wake, Waker};
666
667 struct NoopWaker;
668
669 impl Wake for NoopWaker {
670 fn wake(self: Arc<Self>) {}
671 }
672
673 fn noop_waker() -> Waker {
674 Waker::from(Arc::new(NoopWaker))
675 }
676
677 #[test]
682 fn stream_10mb_body_in_64kb_chunks() {
683 const TARGET_SIZE: usize = 10 * 1024 * 1024; const CHUNK_SIZE: usize = 64 * 1024; let num_chunks = TARGET_SIZE.div_ceil(CHUNK_SIZE);
689 let chunks: Vec<Result<Vec<u8>, RequestBodyStreamError>> = (0..num_chunks)
690 .map(|i| {
691 let start = i * CHUNK_SIZE;
692 let end = std::cmp::min(start + CHUNK_SIZE, TARGET_SIZE);
693 let chunk: Vec<u8> = (start..end).map(|j| (j % 256) as u8).collect();
694 Ok(chunk)
695 })
696 .collect();
697
698 let stream = asupersync::stream::iter(chunks);
699 let mut body_stream = RequestBodyStream::with_expected_size(stream, TARGET_SIZE);
700
701 let waker = noop_waker();
702 let mut ctx = Context::from_waker(&waker);
703
704 let mut total_received = 0usize;
705 let mut chunk_count = 0usize;
706
707 loop {
708 match Pin::new(&mut body_stream).poll_next(&mut ctx) {
709 Poll::Ready(Some(Ok(chunk))) => {
710 total_received += chunk.len();
711 chunk_count += 1;
712 }
713 Poll::Ready(Some(Err(e))) => panic!("Unexpected error: {e}"),
714 Poll::Ready(None) => break,
715 Poll::Pending => panic!("Mock stream should never return Pending"),
716 }
717 }
718
719 assert_eq!(total_received, TARGET_SIZE, "Should receive all 10MB");
720 assert_eq!(
721 chunk_count, num_chunks,
722 "Should have correct number of chunks"
723 );
724 assert!(
725 body_stream.is_complete(),
726 "Stream should be marked complete"
727 );
728 assert_eq!(
729 body_stream.bytes_received(),
730 TARGET_SIZE,
731 "bytes_received should match"
732 );
733 }
734
735 #[test]
736 fn stream_memory_bounded_during_processing() {
737 const TARGET_SIZE: usize = 5 * 1024 * 1024; const CHUNK_SIZE: usize = 64 * 1024; const MAX_MEMORY: usize = 1024 * 1024; let num_chunks = TARGET_SIZE / CHUNK_SIZE;
744 let chunks: Vec<Result<Vec<u8>, RequestBodyStreamError>> =
745 (0..num_chunks).map(|_| Ok(vec![0u8; CHUNK_SIZE])).collect();
746
747 let stream = asupersync::stream::iter(chunks);
748 let mut body_stream = RequestBodyStream::new(stream);
749
750 let waker = noop_waker();
751 let mut ctx = Context::from_waker(&waker);
752
753 let mut processed_total = 0usize;
755 let mut max_held = 0usize;
756
757 loop {
758 match Pin::new(&mut body_stream).poll_next(&mut ctx) {
759 Poll::Ready(Some(Ok(chunk))) => {
760 let chunk_size = chunk.len();
762 max_held = std::cmp::max(max_held, chunk_size);
763 processed_total += chunk_size;
764 }
766 Poll::Ready(Some(Err(e))) => panic!("Unexpected error: {e}"),
767 Poll::Ready(None) => break,
768 Poll::Pending => panic!("Mock stream should never return Pending"),
769 }
770 }
771
772 assert_eq!(processed_total, TARGET_SIZE, "Should process all data");
773 assert!(
774 max_held <= MAX_MEMORY,
775 "Max memory held per chunk ({max_held}) should be < {MAX_MEMORY}"
776 );
777 }
778
779 #[test]
780 fn stream_error_connection_closed() {
781 let chunks: Vec<Result<Vec<u8>, RequestBodyStreamError>> = vec![
783 Ok(vec![1, 2, 3]),
784 Err(RequestBodyStreamError::ConnectionClosed),
785 Ok(vec![4, 5, 6]), ];
787
788 let stream = asupersync::stream::iter(chunks);
789 let mut body_stream = RequestBodyStream::new(stream);
790
791 let waker = noop_waker();
792 let mut ctx = Context::from_waker(&waker);
793
794 match Pin::new(&mut body_stream).poll_next(&mut ctx) {
796 Poll::Ready(Some(Ok(chunk))) => assert_eq!(chunk, vec![1, 2, 3]),
797 other => panic!("Expected first chunk, got {other:?}"),
798 }
799
800 match Pin::new(&mut body_stream).poll_next(&mut ctx) {
802 Poll::Ready(Some(Err(RequestBodyStreamError::ConnectionClosed))) => {}
803 other => panic!("Expected ConnectionClosed error, got {other:?}"),
804 }
805 }
806
807 #[test]
808 fn stream_error_timeout() {
809 let chunks: Vec<Result<Vec<u8>, RequestBodyStreamError>> =
811 vec![Ok(vec![1, 2]), Err(RequestBodyStreamError::Timeout)];
812
813 let stream = asupersync::stream::iter(chunks);
814 let mut body_stream = RequestBodyStream::new(stream);
815
816 let waker = noop_waker();
817 let mut ctx = Context::from_waker(&waker);
818
819 match Pin::new(&mut body_stream).poll_next(&mut ctx) {
821 Poll::Ready(Some(Ok(_))) => {}
822 other => panic!("Expected first chunk, got {other:?}"),
823 }
824
825 match Pin::new(&mut body_stream).poll_next(&mut ctx) {
827 Poll::Ready(Some(Err(RequestBodyStreamError::Timeout))) => {}
828 other => panic!("Expected Timeout error, got {other:?}"),
829 }
830
831 let err = RequestBodyStreamError::Timeout;
833 assert_eq!(format!("{err}"), "timeout waiting for body data");
834 }
835
836 #[test]
837 fn stream_error_too_large() {
838 let err = RequestBodyStreamError::TooLarge {
840 received: 10_000_000,
841 max: 1_000_000,
842 };
843
844 let chunks: Vec<Result<Vec<u8>, RequestBodyStreamError>> = vec![Err(err)];
845
846 let stream = asupersync::stream::iter(chunks);
847 let mut body_stream = RequestBodyStream::new(stream);
848
849 let waker = noop_waker();
850 let mut ctx = Context::from_waker(&waker);
851
852 match Pin::new(&mut body_stream).poll_next(&mut ctx) {
853 Poll::Ready(Some(Err(RequestBodyStreamError::TooLarge { received, max }))) => {
854 assert_eq!(received, 10_000_000);
855 assert_eq!(max, 1_000_000);
856 }
857 other => panic!("Expected TooLarge error, got {other:?}"),
858 }
859
860 let err = RequestBodyStreamError::TooLarge {
862 received: 10_000_000,
863 max: 1_000_000,
864 };
865 assert!(format!("{err}").contains("10000000"));
866 assert!(format!("{err}").contains("1000000"));
867 }
868
869 #[test]
870 fn stream_error_io() {
871 let err = RequestBodyStreamError::Io("disk full".to_string());
873
874 let chunks: Vec<Result<Vec<u8>, RequestBodyStreamError>> = vec![Err(err)];
875
876 let stream = asupersync::stream::iter(chunks);
877 let mut body_stream = RequestBodyStream::new(stream);
878
879 let waker = noop_waker();
880 let mut ctx = Context::from_waker(&waker);
881
882 match Pin::new(&mut body_stream).poll_next(&mut ctx) {
883 Poll::Ready(Some(Err(RequestBodyStreamError::Io(msg)))) => {
884 assert_eq!(msg, "disk full");
885 }
886 other => panic!("Expected Io error, got {other:?}"),
887 }
888
889 let err = RequestBodyStreamError::Io("disk full".to_string());
891 assert!(format!("{err}").contains("disk full"));
892 }
893
894 #[test]
895 fn stream_expected_size_tracking() {
896 const EXPECTED: usize = 1024;
898
899 let chunks: Vec<Result<Vec<u8>, RequestBodyStreamError>> =
900 vec![Ok(vec![0u8; 512]), Ok(vec![0u8; 512])];
901
902 let stream = asupersync::stream::iter(chunks);
903 let body_stream = RequestBodyStream::with_expected_size(stream, EXPECTED);
904
905 assert_eq!(body_stream.expected_size(), Some(EXPECTED));
906 assert_eq!(body_stream.bytes_received(), 0);
907 assert!(!body_stream.is_complete());
908 }
909
910 #[test]
911 fn stream_collect_accumulates_all_chunks() {
912 let chunks: Vec<Result<Vec<u8>, RequestBodyStreamError>> =
914 vec![Ok(vec![1, 2, 3]), Ok(vec![4, 5]), Ok(vec![6, 7, 8, 9])];
915
916 let stream = asupersync::stream::iter(chunks);
917 let body_stream = RequestBodyStream::new(stream);
918
919 let result = futures_executor::block_on(body_stream.collect());
920 assert!(result.is_ok());
921 let data = result.unwrap();
922 assert_eq!(data, vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
923 }
924
925 #[test]
926 fn stream_collect_propagates_error() {
927 let chunks: Vec<Result<Vec<u8>, RequestBodyStreamError>> = vec![
929 Ok(vec![1, 2, 3]),
930 Err(RequestBodyStreamError::ConnectionClosed),
931 Ok(vec![4, 5, 6]),
932 ];
933
934 let stream = asupersync::stream::iter(chunks);
935 let body_stream = RequestBodyStream::new(stream);
936
937 let result = futures_executor::block_on(body_stream.collect());
938 assert!(result.is_err());
939 match result {
940 Err(RequestBodyStreamError::ConnectionClosed) => {}
941 other => panic!("Expected ConnectionClosed, got {other:?}"),
942 }
943 }
944
945 #[test]
946 fn body_streaming_helper_creates_stream() {
947 let chunks: Vec<Result<Vec<u8>, RequestBodyStreamError>> = vec![Ok(vec![1, 2, 3])];
949 let stream = asupersync::stream::iter(chunks);
950 let body = Body::streaming(stream);
951
952 assert!(body.is_streaming());
953 assert!(!body.is_empty());
954 }
955
956 #[test]
957 fn body_streaming_with_size_helper() {
958 let chunks: Vec<Result<Vec<u8>, RequestBodyStreamError>> = vec![Ok(vec![1, 2, 3])];
960 let stream = asupersync::stream::iter(chunks);
961 let body = Body::streaming_with_size(stream, 3);
962
963 assert!(body.is_streaming());
964
965 if let Body::Stream(s) = body {
966 assert_eq!(s.expected_size(), Some(3));
967 } else {
968 panic!("Expected Body::Stream");
969 }
970 }
971
972 #[test]
973 fn body_into_bytes_async_handles_stream() {
974 let chunks: Vec<Result<Vec<u8>, RequestBodyStreamError>> =
976 vec![Ok(vec![1, 2]), Ok(vec![3, 4])];
977 let stream = asupersync::stream::iter(chunks);
978 let body = Body::streaming(stream);
979
980 let result = futures_executor::block_on(body.into_bytes_async());
981 assert!(result.is_ok());
982 assert_eq!(result.unwrap(), vec![1, 2, 3, 4]);
983 }
984
985 #[test]
986 fn body_take_stream_extracts_stream() {
987 let chunks: Vec<Result<Vec<u8>, RequestBodyStreamError>> = vec![Ok(vec![1, 2, 3])];
989 let stream = asupersync::stream::iter(chunks);
990 let body = Body::streaming(stream);
991
992 let taken = body.take_stream();
993 assert!(taken.is_some());
994
995 let empty_body = Body::Empty;
997 assert!(empty_body.take_stream().is_none());
998
999 let bytes_body = Body::Bytes(vec![1, 2, 3]);
1000 assert!(bytes_body.take_stream().is_none());
1001 }
1002
1003 #[test]
1004 #[should_panic(expected = "cannot synchronously convert streaming body")]
1005 fn body_into_bytes_panics_for_stream() {
1006 let chunks: Vec<Result<Vec<u8>, RequestBodyStreamError>> = vec![Ok(vec![1, 2, 3])];
1008 let stream = asupersync::stream::iter(chunks);
1009 let body = Body::streaming(stream);
1010
1011 let _ = body.into_bytes(); }
1013
1014 #[test]
1019 fn headers_lowercase_key_fast_path() {
1020 let mut headers = Headers::new();
1022 headers.insert("content-type", b"application/json".to_vec());
1023
1024 assert!(headers.get("content-type").is_some());
1026 assert!(headers.contains("content-type"));
1027
1028 assert!(headers.get("Content-Type").is_some());
1030 assert!(headers.contains("CONTENT-TYPE"));
1031 }
1032
1033 #[test]
1034 fn headers_case_insensitive_lookup() {
1035 let mut headers = Headers::new();
1037 headers.insert("X-Custom-Header", b"value".to_vec());
1038
1039 assert_eq!(headers.get("x-custom-header"), Some(b"value".as_slice()));
1041 assert_eq!(headers.get("X-CUSTOM-HEADER"), Some(b"value".as_slice()));
1042 assert_eq!(headers.get("X-Custom-Header"), Some(b"value".as_slice()));
1043 assert_eq!(headers.get("x-CuStOm-HeAdEr"), Some(b"value".as_slice()));
1044 }
1045
1046 #[test]
1047 fn headers_remove_case_insensitive() {
1048 let mut headers = Headers::new();
1050 headers.insert("Authorization", b"Bearer token".to_vec());
1051
1052 let removed = headers.remove("AUTHORIZATION");
1054 assert_eq!(removed, Some(b"Bearer token".to_vec()));
1055 assert!(!headers.contains("authorization"));
1056 }
1057
1058 #[test]
1059 fn lowercase_header_key_already_lowercase() {
1060 use std::borrow::Cow;
1062
1063 let result = lowercase_header_key("content-type");
1064 assert!(matches!(result, Cow::Borrowed(_)));
1065 assert_eq!(result.as_ref(), "content-type");
1066 }
1067
1068 #[test]
1069 fn lowercase_header_key_needs_conversion() {
1070 use std::borrow::Cow;
1072
1073 let result = lowercase_header_key("Content-Type");
1074 assert!(matches!(result, Cow::Owned(_)));
1075 assert_eq!(result.as_ref(), "content-type");
1076 }
1077
1078 #[test]
1079 fn lowercase_header_key_all_uppercase() {
1080 use std::borrow::Cow;
1081
1082 let result = lowercase_header_key("CONTENT-TYPE");
1083 assert!(matches!(result, Cow::Owned(_)));
1084 assert_eq!(result.as_ref(), "content-type");
1085 }
1086}