http_kit/body/mod.rs
1//! HTTP request/response body handling.
2//!
3//! This module provides a flexible [`Body`] type that can represent HTTP request and response bodies
4//! in various forms while maintaining efficiency and type safety.
5//!
6//! # Body Representation
7//!
8//! The body can hold data in different forms:
9//!
10//! - **Bytes**: For simple in-memory bodies that fit entirely in memory
11//! - **AsyncReader**: For streaming from files or other async sources
12//! - **Stream**: For general async streaming data with backpressure support
13//! - **Frozen**: For consumed bodies that can no longer provide data
14//!
15//! # Format Support
16//!
17//! The body type provides convenient methods for working with common formats:
18//!
19//! - **JSON** (with `json` feature): Serialize/deserialize to/from JSON
20//! - **URL-encoded forms** (with `form` feature): Handle form data
21//! - **Files** (with `fs` feature): Stream file contents with MIME detection
22//! - **Raw bytes**: Direct byte manipulation and string conversion
23//!
24//! # Examples
25//!
26//! ## Basic Usage
27//!
28//! ```rust
29//! use http_kit::Body;
30//!
31//! // Create empty body
32//! let empty = Body::empty();
33//!
34//! // Create from string
35//! let text = Body::from_bytes("Hello world!");
36//!
37//! // Create from bytes
38//! let data = Body::from_bytes(vec![1, 2, 3, 4]);
39//! ```
40//!
41//! ## JSON Handling
42//!
43/// ```rust
44/// # #[cfg(feature = "json")]
45/// # {
46/// use http_kit::Body;
47/// use serde::{Serialize, Deserialize};
48///
49/// #[derive(Serialize, Deserialize)]
50/// struct User { name: String }
51///
52/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
53/// // Create from JSON
54/// let user = User { name: "Alice".to_string() };
55/// let body = Body::from_json(&user)?;
56///
57/// // Parse to JSON
58/// let mut body = Body::from_bytes(r#"{"name":"Bob"}"#);
59/// let user: User = body.into_json().await?;
60/// # Ok(())
61/// # }
62/// # }
63/// ```
64//
65// ## File Streaming
66//
67// ```rust,no_run
68// # #[cfg(feature = "fs")]
69// # {
70// use http_kit::Body;
71//
72// // Stream file contents
73// let body = Body::from_file("large_file.dat").await?;
74// # }
75// # Ok::<(), std::io::Error>(())
76// ```
77mod convert;
78mod error_type;
79#[cfg(feature = "std")]
80mod utils;
81use crate::sse::{Event, SseStream};
82pub use error_type::Error;
83#[cfg(feature = "std")]
84extern crate std;
85use futures_lite::{ready, Stream, StreamExt};
86use http_body::Frame;
87use http_body_util::{BodyExt, StreamBody};
88use mime::Mime;
89
90#[cfg(feature = "std")]
91use self::utils::IntoAsyncRead;
92use bytestr::ByteStr;
93
94use bytes::Bytes;
95use futures_lite::{AsyncBufRead, AsyncBufReadExt};
96
97use alloc::{boxed::Box, vec::Vec};
98use core::fmt::Debug;
99use core::mem::{replace, swap, take};
100use core::pin::Pin;
101use core::task::{Context, Poll};
102
103// A boxed bufreader object.
104type BoxBufReader = Pin<Box<dyn AsyncBufRead + Send + Sync + 'static>>;
105
106type BoxHttpBody =
107 Pin<Box<dyn http_body::Body<Data = Bytes, Error = Error> + Send + Sync + 'static>>;
108
109pub use http_body::Body as HttpBody;
110
111/// Flexible HTTP body that can represent data in various forms.
112///
113/// `Body` is the core type for representing HTTP request and response bodies.
114/// It can efficiently handle different data sources:
115///
116/// - **In-memory data**: Bytes, strings, vectors
117/// - **Streaming data**: Files, network streams, async readers
118/// - **Structured data**: JSON, form data (with appropriate features)
119///
120/// The body automatically manages the underlying representation and provides
121/// zero-copy conversions where possible.
122///
123/// # Examples
124///
125/// ```rust
126/// use http_kit::Body;
127///
128/// // Create from string
129/// let body = Body::from_bytes("Hello, world!");
130///
131/// // Create empty body
132/// let empty = Body::empty();
133///
134/// // Check if empty (when size is known)
135/// if let Some(true) = body.is_empty() {
136/// println!("Body is empty");
137/// }
138/// ```
139pub struct Body {
140 mime: Option<Mime>,
141 inner: BodyInner,
142}
143
144impl Debug for Body {
145 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
146 f.write_str("Body")
147 }
148}
149
150impl_error!(
151 BodyFrozen,
152 "Body was frozen,it may have been consumed by `take()`"
153);
154
155enum BodyInner {
156 Once(Bytes),
157 Reader {
158 reader: BoxBufReader,
159 length: Option<usize>,
160 },
161 HttpBody(BoxHttpBody),
162 Freeze,
163}
164
165impl Default for BodyInner {
166 fn default() -> Self {
167 Self::Once(Bytes::new())
168 }
169}
170
171impl Body {
172 /// Creates a new empty body.
173 ///
174 /// This creates a body with zero bytes that can be used as a placeholder
175 /// or default value.
176 ///
177 /// # Examples
178 ///
179 /// ```rust
180 /// use http_kit::Body;
181 ///
182 /// let body = Body::empty();
183 /// assert_eq!(body.len(), Some(0));
184 /// ```
185 pub const fn empty() -> Self {
186 Self {
187 mime: None,
188 inner: BodyInner::Once(Bytes::new()),
189 }
190 }
191
192 /// Creates a new body from any type implementing `http_body::Body`.
193 ///
194 /// This method allows wrapping any HTTP body implementation into this
195 /// `Body` type, providing a unified interface for different body sources.
196 /// The body data will be converted to `Bytes` and errors will be boxed.
197 ///
198 /// # Type Parameters
199 ///
200 /// * `B` - The body type implementing `http_body::Body`
201 ///
202 /// # Examples
203 ///
204 /// ```rust
205 /// use http_kit::Body;
206 /// use http_body_util::Full;
207 /// use bytes::Bytes;
208 ///
209 /// let http_body = Full::new(Bytes::from("Hello, world!"));
210 /// let body = Body::new(http_body);
211 /// ```
212 pub fn new<B>(body: B) -> Self
213 where
214 B: Send + Sync + http_body::Body + 'static,
215 B::Data: Into<Bytes>,
216 B::Error: Into<Error>,
217 {
218 Self {
219 mime: None,
220 inner: BodyInner::HttpBody(Box::pin(
221 body.map_frame(|result| result.map_data(|data| data.into()))
222 .map_err(|e| e.into()),
223 )),
224 }
225 }
226
227 /// Creates a new frozen body that cannot provide data.
228 ///
229 ///
230 ///
231 /// A frozen body represents a body that has been consumed and can no longer
232 /// provide data. This is typically used internally after a body has been
233 /// taken or consumed.
234 ///
235 /// # Examples
236 ///
237 /// ```rust
238 /// use http_kit::Body;
239 ///
240 /// let body = Body::frozen();
241 /// assert!(body.is_frozen());
242 /// ```
243 pub const fn frozen() -> Self {
244 Self {
245 mime: None,
246 inner: BodyInner::Freeze,
247 }
248 }
249
250 /// Creates a body from an async buffered reader.
251 ///
252 /// This method allows streaming data from any source that implements
253 /// `AsyncBufRead`, such as files, network connections, or in-memory buffers.
254 /// The optional length hint can improve performance for operations that
255 /// benefit from knowing the total size.
256 ///
257 /// You are responsible for setting the MIME type of the body.
258 ///
259 /// # Arguments
260 ///
261 /// * `reader` - Any type implementing `AsyncBufRead + Send + 'static`
262 /// * `length` - Optional hint about the total number of bytes to read
263 ///
264 /// # Examples
265 ///
266 /// ```rust,no_run
267 /// # #[cfg(feature = "fs")]
268 /// # {
269 /// use http_kit::Body;
270 /// use async_fs::File;
271 /// use futures_lite::io::BufReader;
272 ///
273 /// # async fn example() -> Result<(), http_kit::BodyError> {
274 /// let file = File::open("data.txt").await?;
275 /// let metadata = file.metadata().await?;
276 /// let reader = BufReader::new(file);
277 ///
278 /// let body = Body::from_reader(reader, metadata.len() as usize);
279 /// # Ok(())
280 /// # }
281 /// # }
282 /// ```
283 pub fn from_reader(
284 reader: impl AsyncBufRead + Send + Sync + 'static,
285 length: impl Into<Option<usize>>,
286 ) -> Self {
287 Self {
288 mime: None,
289 inner: BodyInner::Reader {
290 reader: Box::pin(reader),
291 length: length.into(),
292 },
293 }
294 }
295
296 /// Creates a body from an async stream of data chunks.
297 ///
298 /// This method allows creating a body from any stream that yields
299 /// `Result<T, E>` where `T` can be converted to `Bytes`. This is useful
300 /// for handling data from network sources, databases, or custom generators.
301 ///
302 /// You are responsible for setting the MIME type of the body.
303 ///
304 /// # Type Parameters
305 ///
306 /// * `T` - Data type that can be converted to `Bytes`
307 /// * `E` - Error type that can be converted to a boxed error
308 /// * `S` - Stream type yielding `Result<T, E>`
309 ///
310 /// # Examples
311 ///
312 /// ```rust
313 /// use http_kit::Body;
314 /// use futures_lite::stream;
315 ///
316 /// # async fn example() {
317 /// let data_stream = stream::iter(vec![
318 /// Ok::<_, std::io::Error>("Hello, ".as_bytes()),
319 /// Ok("world!".as_bytes()),
320 /// ]);
321 ///
322 /// let body = Body::from_stream(data_stream);
323 /// # }
324 /// ```
325 pub fn from_stream<T, E, S>(stream: S) -> Self
326 where
327 T: Into<Bytes> + Send + 'static,
328 E: Into<Error>,
329 S: Stream<Item = Result<T, E>> + Send + Sync + 'static,
330 {
331 Self {
332 mime: None,
333 inner: BodyInner::HttpBody(Box::pin(StreamBody::new(stream.map(|result| {
334 result
335 .map(|data| Frame::data(data.into()))
336 .map_err(|error| error.into())
337 })))),
338 }
339 }
340 /// Creates a body from bytes or byte-like data.
341 ///
342 /// This method accepts any type that can be converted to `Bytes`,
343 /// including `String`, `Vec<u8>`, `&str`, `&[u8]`, and `Bytes` itself.
344 /// The conversion is zero-copy when possible.
345 ///
346 /// By default, the MIME type is set to `application/octet-stream`.
347 ///
348 /// # Examples
349 ///
350 /// ```rust
351 /// use http_kit::Body;
352 ///
353 /// // From string slice
354 /// let body1 = Body::from_bytes("Hello, world!");
355 ///
356 /// // From String
357 /// let body2 = Body::from_bytes("Hello, world!".to_string());
358 ///
359 /// // From byte vector
360 /// let body3 = Body::from_bytes(vec![72, 101, 108, 108, 111]);
361 /// ```
362 pub fn from_bytes(data: impl Into<Bytes>) -> Self {
363 Self {
364 mime: Some(mime::APPLICATION_OCTET_STREAM),
365 inner: BodyInner::Once(data.into()),
366 }
367 }
368
369 /// Creates a body from a string slice.
370 ///
371 ///
372 /// This method accepts any type that can be converted to `ByteStr`,
373 /// including `String`, `&str`, and `ByteStr` itself.
374 /// The conversion is zero-copy when possible.
375 ///
376 /// By default, the MIME type is set to `text/plain; charset=utf-8`.
377 ///
378 /// # Examples
379 ///
380 /// ```rust
381 /// use http_kit::Body;
382 /// use bytestr::ByteStr;
383 ///
384 /// // From string slice
385 /// let body1 = Body::from_text("Hello, world!");
386 ///
387 /// // From String
388 /// let body2 = Body::from_text("Hello, world!".to_string());
389 /// ```
390 pub fn from_text(str: impl Into<ByteStr>) -> Self {
391 Self {
392 mime: Some(mime::TEXT_PLAIN_UTF_8),
393 inner: BodyInner::Once(str.into().into()),
394 }
395 }
396
397 /// Creates a body by streaming the contents of a file.
398 ///
399 /// This method opens a file and creates a streaming body that reads
400 /// the file contents on demand. The file size is determined automatically
401 /// and used as a length hint for optimization.
402 ///
403 /// # Arguments
404 ///
405 /// * `path` - Path to the file to read
406 ///
407 /// # Errors
408 ///
409 /// Returns an `std::io::Error` if the file cannot be opened or its metadata
410 /// cannot be read.
411 ///
412 /// # Examples
413 ///
414 /// ```rust,no_run
415 /// # #[cfg(feature = "fs")]
416 /// # {
417 /// use http_kit::Body;
418 ///
419 /// # async fn example() -> Result<(), std::io::Error> {
420 /// let body = Body::from_file("large_document.pdf").await?;
421 /// println!("File body created with {} bytes", body.len().unwrap_or(0));
422 /// # Ok(())
423 /// # }
424 /// # }
425 /// ```
426 #[cfg(all(feature = "fs", feature = "std"))]
427 pub async fn from_file(path: impl AsRef<std::path::Path>) -> Result<Self, std::io::Error> {
428 let path = path.as_ref();
429 let file = async_fs::File::open(path).await?;
430 let len = file.metadata().await?.len() as usize;
431 let mime = if let Some(ext) = path.extension() {
432 if let Some(ext_str) = ext.to_str() {
433 Self::guess(ext_str.as_bytes()).and_then(|m| m.parse().ok())
434 } else {
435 None
436 }
437 } else {
438 None
439 };
440 Ok(Self {
441 mime,
442 ..Self::from_reader(futures_lite::io::BufReader::new(file), len)
443 })
444 }
445
446 /// Creates a body by serializing an object to JSON.
447 ///
448 /// This method serializes any `Serialize` type to JSON and creates
449 /// a body containing the JSON string. The resulting body will have
450 /// UTF-8 encoded JSON content.
451 ///
452 /// By default, the MIME type is set to `application/json`.
453 ///
454 /// # Arguments
455 ///
456 /// * `value` - Any type implementing `serde::Serialize`
457 ///
458 /// # Errors
459 ///
460 /// Returns `serde_json::Error` if serialization fails.
461 ///
462 /// # Examples
463 ///
464 /// ```rust
465 /// # #[cfg(feature = "json")]
466 /// # {
467 /// use http_kit::Body;
468 /// use serde::Serialize;
469 ///
470 /// #[derive(Serialize)]
471 /// struct User {
472 /// name: String,
473 /// age: u32,
474 /// }
475 ///
476 /// let user = User {
477 /// name: "Alice".to_string(),
478 /// age: 30,
479 /// };
480 ///
481 /// let body = Body::from_json(&user)?;
482 /// # }
483 /// # Ok::<(), serde_json::Error>(())
484 /// ```
485 #[cfg(feature = "json")]
486 pub fn from_json<T: serde::Serialize>(value: T) -> Result<Self, serde_json::Error> {
487 Ok(Self {
488 mime: Some(mime::APPLICATION_JSON),
489 ..Self::from_bytes(serde_json::to_string(&value)?)
490 })
491 }
492
493 #[cfg(feature = "fs")]
494 fn guess(extension: &[u8]) -> Option<&'static str> {
495 let s = core::str::from_utf8(extension).ok()?;
496 mime_guess::from_ext(s).first_raw()
497 }
498
499 /// Creates a body by serializing an object to URL-encoded form data.
500 ///
501 /// This method serializes any `Serialize` type to `application/x-www-form-urlencoded`
502 /// format, commonly used for HTML form submissions.
503 ///
504 /// By default, the MIME type is set to `application/x-www-form-urlencoded`.
505 ///
506 /// # Arguments
507 ///
508 /// * `value` - Any type implementing `serde::Serialize`
509 ///
510 /// # Errors
511 ///
512 /// Returns `serde_urlencoded::ser::Error` if serialization fails.
513 ///
514 /// # Examples
515 ///
516 /// ```rust
517 /// # #[cfg(feature = "form")]
518 /// # {
519 /// use http_kit::Body;
520 /// use serde::Serialize;
521 ///
522 /// #[derive(Serialize)]
523 /// struct LoginForm {
524 /// username: String,
525 /// password: String,
526 /// }
527 ///
528 /// let form = LoginForm {
529 /// username: "user".to_string(),
530 /// password: "pass".to_string(),
531 /// };
532 ///
533 /// let body = Body::from_form(&form)?;
534 /// # }
535 /// # Ok::<(), serde_urlencoded::ser::Error>(())
536 /// ```
537 #[cfg(feature = "form")]
538 pub fn from_form<T: serde::Serialize>(value: T) -> Result<Self, serde_urlencoded::ser::Error> {
539 Ok(Self {
540 mime: Some(mime::APPLICATION_WWW_FORM_URLENCODED),
541 ..Self::from_bytes(serde_urlencoded::to_string(value)?)
542 })
543 }
544
545 /// Creates a body from a stream of Server-Sent Events (SSE).
546 ///
547 /// This method converts a stream of SSE events into a body that can be used
548 /// for HTTP responses. The events are formatted according to the SSE specification
549 /// and can be consumed by EventSource clients.
550 ///
551 /// # Type Parameters
552 ///
553 /// * `S` - Stream type yielding `Result<Event, E>`
554 /// * `E` - Error type that can be converted to a boxed error
555 ///
556 /// # Examples
557 ///
558 /// ```rust
559 /// use http_kit::{Body, sse::Event};
560 /// use futures_lite::stream;
561 ///
562 /// # async fn example() {
563 /// let events = stream::iter(vec![
564 /// Ok::<_, std::io::Error>(Event::from_data("Hello").with_id("1")),
565 /// Ok(Event::from_data("World").with_id("2")),
566 /// ]);
567 ///
568 /// let body = Body::from_sse(events);
569 /// # }
570 /// ```
571 pub fn from_sse<S, E>(s: S) -> Self
572 where
573 S: Stream<Item = Result<Event, E>> + Send + Sync + 'static,
574 E: Into<Error> + Send + Sync + 'static,
575 {
576 Self {
577 mime: Some(mime::TEXT_EVENT_STREAM),
578 inner: BodyInner::HttpBody(Box::pin(
579 crate::sse::into_body(s)
580 .map_frame(|result| result.map_data(|data| data))
581 .map_err(|e| e.into()),
582 )),
583 }
584 }
585
586 /// Returns the MIME type of the body, if known.
587 pub fn mime(&self) -> Option<&Mime> {
588 self.mime.as_ref()
589 }
590
591 /// Sets the MIME type of the body.
592 pub fn with_mime(mut self, mime: Mime) -> Self {
593 self.mime = Some(mime);
594 self
595 }
596
597 /// Returns the length of the body in bytes, if known.
598 ///
599 /// This method returns `Some(length)` for in-memory bodies where the size
600 /// is immediately available. For streaming bodies (files, readers, streams),
601 /// it returns `None` since the total size may not be known until the entire
602 /// body is consumed.
603 ///
604 /// The returned length is primarily used for optimizations like setting
605 /// `Content-Length` headers, but should be considered a hint rather than
606 /// a guarantee.
607 ///
608 /// # Examples
609 ///
610 /// ```rust
611 /// use http_kit::Body;
612 ///
613 /// let body = Body::from_bytes("Hello, world!");
614 /// assert_eq!(body.len(), Some(13));
615 ///
616 /// let empty = Body::empty();
617 /// assert_eq!(empty.len(), Some(0));
618 /// ```
619 pub const fn len(&self) -> Option<usize> {
620 match &self.inner {
621 BodyInner::Once(bytes) => Some(bytes.len()),
622 BodyInner::Reader { length, .. } => *length,
623 _ => None,
624 }
625 }
626
627 /// Returns whether the body is empty, if the length is known.
628 ///
629 /// This method returns `Some(true)` if the body is known to be empty,
630 /// `Some(false)` if the body is known to contain data, and `None` if
631 /// the body length cannot be determined without consuming it.
632 ///
633 /// # Examples
634 ///
635 /// ```rust
636 /// use http_kit::Body;
637 ///
638 /// let empty = Body::empty();
639 /// assert_eq!(empty.is_empty(), Some(true));
640 ///
641 /// let body = Body::from_bytes("data");
642 /// assert_eq!(body.is_empty(), Some(false));
643 /// ```
644 pub const fn is_empty(&self) -> Option<bool> {
645 if let Some(len) = self.len() {
646 if len == 0 {
647 Some(true)
648 } else {
649 Some(false)
650 }
651 } else {
652 None
653 }
654 }
655
656 /// Consumes the body and returns all its data as `Bytes`.
657 ///
658 /// This method reads the entire body into memory and returns it as a
659 /// `Bytes` object. For large bodies or streams, this may consume significant
660 /// memory. For streaming bodies, all data will be read and concatenated.
661 ///
662 /// # Errors
663 ///
664 /// Returns an error if:
665 /// - The body is frozen (already consumed)
666 /// - An I/O error occurs while reading streaming data
667 /// - The underlying stream produces an error
668 ///
669 /// # Examples
670 ///
671 /// ```rust
672 /// use http_kit::Body;
673 ///
674 /// # async fn example() -> Result<(), http_kit::BodyError> {
675 /// let body = Body::from_bytes("Hello, world!");
676 /// let bytes = body.into_bytes().await?;
677 /// assert_eq!(bytes, "Hello, world!");
678 /// # Ok(())
679 /// # }
680 /// ```
681 pub async fn into_bytes(self) -> Result<Bytes, Error> {
682 match self.inner {
683 BodyInner::Once(bytes) => Ok(bytes),
684 BodyInner::Reader { mut reader, length } => {
685 let mut vec = Vec::with_capacity(length.unwrap_or_default());
686 loop {
687 let data = reader.fill_buf().await?;
688 if data.is_empty() {
689 break;
690 } else {
691 let len = data.len();
692 vec.extend_from_slice(data);
693 reader.as_mut().consume(len);
694 }
695 }
696 Ok(vec.into())
697 }
698
699 BodyInner::HttpBody(body) => {
700 let mut body = body.into_data_stream();
701
702 let first = body.try_next().await?.unwrap_or_default();
703 let second = body.try_next().await?;
704 if let Some(second) = second {
705 let remain_size_hint = body.size_hint();
706 let mut vec = Vec::with_capacity(
707 first.len()
708 + second.len()
709 + remain_size_hint.1.unwrap_or(remain_size_hint.0),
710 );
711 vec.extend_from_slice(&first);
712 vec.extend_from_slice(&second);
713 while let Some(data) = body.try_next().await? {
714 vec.extend_from_slice(&data);
715 }
716 Ok(vec.into())
717 } else {
718 Ok(first)
719 }
720 }
721 BodyInner::Freeze => Err(Error::BodyFrozen),
722 }
723 }
724
725 /// Consumes the body and returns its data as a UTF-8 string.
726 ///
727 /// This method reads the entire body into memory and converts it to a
728 /// UTF-8 string, returning a `ByteStr` which provides string-like operations
729 /// while maintaining the underlying byte representation.
730 ///
731 /// # Errors
732 ///
733 /// Returns an error if:
734 /// - The body is frozen (already consumed)
735 /// - An I/O error occurs while reading streaming data
736 /// - The body contains invalid UTF-8 sequences
737 ///
738 /// # Examples
739 ///
740 /// ```rust
741 /// use http_kit::Body;
742 ///
743 /// # async fn example() -> Result<(), http_kit::BodyError> {
744 /// let body = Body::from_bytes("Hello, world!");
745 /// let text = body.into_string().await?;
746 /// assert_eq!(text, "Hello, world!");
747 /// # Ok(())
748 /// # }
749 /// ```
750 pub async fn into_string(self) -> Result<ByteStr, Error> {
751 Ok(ByteStr::from_utf8(self.into_bytes().await?)?)
752 }
753
754 /// Converts the body into an async buffered reader.
755 ///
756 /// This method wraps the body in a type that implements `AsyncBufRead`,
757 /// allowing it to be used anywhere that expects an async reader. This is
758 /// useful for streaming the body data to other async I/O operations.
759 ///
760 /// # Examples
761 ///
762 /// ```rust
763 /// use http_kit::Body;
764 /// use futures_lite::AsyncBufReadExt;
765 ///
766 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
767 /// let body = Body::from_bytes("line1\nline2\nline3");
768 /// let mut reader = body.into_reader();
769 /// let mut line = String::new();
770 /// reader.read_line(&mut line).await?;
771 /// assert_eq!(line, "line1\n");
772 /// # Ok(())
773 /// # }
774 /// ```
775 #[cfg(feature = "std")]
776 pub fn into_reader(self) -> impl AsyncBufRead + Send {
777 IntoAsyncRead::new(self)
778 }
779
780 /// Converts the body into a Server-Sent Events (SSE) stream.
781 ///
782 /// This method transforms the body into a stream of SSE events, which can be used
783 /// to handle eventsource responses in HTTP servers or clients.
784 pub fn into_sse(self) -> SseStream {
785 SseStream::new(self)
786 }
787
788 /// Returns a reference to the body data as bytes.
789 ///
790 /// This method ensures the body data is available as a byte slice and returns
791 /// a reference to it. For streaming bodies, this will consume and buffer all
792 /// data in memory. The body is modified to store the buffered data internally.
793 ///
794 /// # Errors
795 ///
796 /// Returns an error if:
797 /// - The body is frozen (already consumed)
798 /// - An I/O error occurs while reading streaming data
799 ///
800 /// # Examples
801 ///
802 /// ```rust
803 /// use http_kit::Body;
804 ///
805 /// # async fn example() -> Result<(), http_kit::BodyError> {
806 /// let mut body = Body::from_bytes("Hello, world!");
807 /// let bytes = body.as_bytes().await?;
808 /// assert_eq!(bytes, b"Hello, world!");
809 /// # Ok(())
810 /// # }
811 /// ```
812 pub async fn as_bytes(&mut self) -> Result<&[u8], Error> {
813 self.inner = BodyInner::Once(self.take()?.into_bytes().await?);
814 match self.inner {
815 BodyInner::Once(ref bytes) => Ok(bytes),
816 _ => unreachable!(),
817 }
818 }
819
820 /// Returns a reference to the body data as a UTF-8 string slice.
821 ///
822 /// This method ensures the body data is available as a string slice and returns
823 /// a reference to it. For streaming bodies, this will consume and buffer all
824 /// data in memory first. The body is modified to store the buffered data internally.
825 ///
826 /// # Errors
827 ///
828 /// Returns an error if:
829 /// - The body is frozen (already consumed)
830 /// - An I/O error occurs while reading streaming data
831 /// - The body contains invalid UTF-8 sequences
832 ///
833 /// # Examples
834 ///
835 /// ```rust
836 /// use http_kit::Body;
837 ///
838 /// # async fn example() -> Result<(), http_kit::BodyError> {
839 /// let mut body = Body::from_bytes("Hello, world!");
840 /// let text = body.as_str().await?;
841 /// assert_eq!(text, "Hello, world!");
842 /// # Ok(())
843 /// # }
844 /// ```
845 pub async fn as_str(&mut self) -> Result<&str, Error> {
846 let data = self.as_bytes().await?;
847 Ok(core::str::from_utf8(data)?)
848 }
849
850 /// Deserializes the body data as JSON into the specified type.
851 ///
852 /// This method reads the body data and attempts to deserialize it as JSON.
853 /// The deserialization is performed with zero-copy when possible by working
854 /// directly with the buffered byte data.
855 ///
856 /// # Warning
857 ///
858 /// This method does not validate the `Content-Type` header. If you need
859 /// MIME type validation, use `Request::into_json()` or `Response::into_json()`
860 /// instead, which check for the `application/json` content type.
861 ///
862 /// # Errors
863 ///
864 /// Returns an error if:
865 /// - The body is frozen (already consumed)
866 /// - An I/O error occurs while reading streaming data
867 /// - The JSON is malformed or doesn't match the target type
868 ///
869 /// # Examples
870 ///
871 /// ```rust
872 /// # #[cfg(feature = "json")]
873 /// # {
874 /// use http_kit::Body;
875 /// use serde::Deserialize;
876 ///
877 /// #[derive(Deserialize, PartialEq, Debug)]
878 /// struct User {
879 /// name: String,
880 /// age: u32,
881 /// }
882 ///
883 /// # async fn example() -> Result<(), http_kit::BodyError> {
884 /// let json_data = r#"{"name": "Alice", "age": 30}"#;
885 /// let mut body = Body::from_bytes(json_data);
886 /// let user: User = body.into_json().await?;
887 /// assert_eq!(user.name, "Alice");
888 /// # Ok(())
889 /// # }
890 /// # }
891 /// ```
892 #[cfg(feature = "json")]
893 pub async fn into_json<'a, T>(&'a mut self) -> Result<T, Error>
894 where
895 T: serde::Deserialize<'a>,
896 {
897 Ok(serde_json::from_slice(self.as_bytes().await?)?)
898 }
899
900 /// Deserializes the body data as URL-encoded form data into the specified type.
901 ///
902 /// This method reads the body data and attempts to deserialize it as
903 /// `application/x-www-form-urlencoded` data. The deserialization is performed
904 /// with zero-copy when possible by working directly with the buffered byte data.
905 ///
906 /// # Warning
907 ///
908 /// This method does not validate the `Content-Type` header. If you need
909 /// MIME type validation, use `Request::into_form()` or `Response::into_form()`
910 /// instead, which check for the `application/x-www-form-urlencoded` content type.
911 ///
912 /// # Errors
913 ///
914 /// Returns an error if:
915 /// - The body is frozen (already consumed)
916 /// - An I/O error occurs while reading streaming data
917 /// - The form data is malformed or doesn't match the target type
918 ///
919 /// # Examples
920 ///
921 /// ```rust
922 /// # #[cfg(feature = "form")]
923 /// # {
924 /// use http_kit::Body;
925 /// use serde::Deserialize;
926 ///
927 /// #[derive(Deserialize, PartialEq, Debug)]
928 /// struct LoginForm {
929 /// username: String,
930 /// password: String,
931 /// }
932 ///
933 /// # async fn example() -> Result<(), http_kit::BodyError> {
934 /// let form_data = "username=alice&password=secret123";
935 /// let mut body = Body::from_bytes(form_data);
936 /// let form: LoginForm = body.into_form().await?;
937 /// assert_eq!(form.username, "alice");
938 /// # Ok(())
939 /// # }
940 /// # }
941 /// ```
942 #[cfg(feature = "form")]
943 pub async fn into_form<'a, T>(&'a mut self) -> Result<T, Error>
944 where
945 T: serde::Deserialize<'a>,
946 {
947 Ok(serde_urlencoded::from_bytes(self.as_bytes().await?)?)
948 }
949
950 /// Replaces this body with a new body and returns the old body.
951 ///
952 /// This method swaps the current body with the provided body, returning
953 /// the original body value. This can be useful for chaining operations
954 /// or temporarily substituting body content.
955 ///
956 /// # Examples
957 ///
958 /// ```rust
959 /// use http_kit::Body;
960 ///
961 /// let mut body = Body::from_bytes("original");
962 /// let old_body = body.replace(Body::from_bytes("replacement"));
963 ///
964 /// // `body` now contains "replacement"
965 /// // `old_body` contains "original"
966 /// ```
967 pub fn replace(&mut self, body: Body) -> Body {
968 replace(self, body)
969 }
970
971 /// Swaps the contents of this body with another body.
972 ///
973 /// This method exchanges the contents of two bodies, provided that this
974 /// body is not frozen. If the body is frozen (already consumed), the
975 /// operation fails and returns an error.
976 ///
977 /// # Errors
978 ///
979 /// Returns `BodyFrozen` if this body has been frozen/consumed.
980 ///
981 /// # Examples
982 ///
983 /// ```rust
984 /// use http_kit::Body;
985 ///
986 /// let mut body1 = Body::from_bytes("first");
987 /// let mut body2 = Body::from_bytes("second");
988 ///
989 /// body1.swap(&mut body2)?;
990 ///
991 /// // Now body1 contains "second" and body2 contains "first"
992 /// # Ok::<(), http_kit::BodyError>(())
993 /// ```
994 pub fn swap(&mut self, body: &mut Body) -> Result<(), BodyFrozen> {
995 if self.is_frozen() {
996 Err(BodyFrozen::new())
997 } else {
998 swap(self, body);
999 Ok(())
1000 }
1001 }
1002
1003 /// Consumes and takes the body, leaving a frozen body in its place.
1004 ///
1005 /// This method extracts the body content and replaces it with a frozen
1006 /// (unusable) body. This is useful when you need to move the body to
1007 /// another location while ensuring the original cannot be used again.
1008 ///
1009 /// # Errors
1010 ///
1011 /// Returns `BodyFrozen` if the body is already frozen.
1012 ///
1013 /// # Examples
1014 ///
1015 /// ```rust
1016 /// use http_kit::Body;
1017 ///
1018 /// let mut body = Body::from_bytes("Hello, world!");
1019 /// let taken_body = body.take()?;
1020 ///
1021 /// // `taken_body` contains the original data
1022 /// // `body` is now frozen and cannot be used
1023 /// assert!(body.is_frozen());
1024 /// # Ok::<(), http_kit::BodyError>(())
1025 /// ```
1026 pub fn take(&mut self) -> Result<Self, BodyFrozen> {
1027 if self.is_frozen() {
1028 Err(BodyFrozen::new())
1029 } else {
1030 Ok(self.replace(Self::frozen()))
1031 }
1032 }
1033
1034 /// Returns `true` if the body is frozen (consumed), `false` otherwise.
1035 ///
1036 /// A frozen body is one that has been consumed by operations like `take()`
1037 /// and can no longer provide data. This is different from an empty body,
1038 /// which still has a valid state but contains no data.
1039 ///
1040 /// # Examples
1041 ///
1042 /// ```rust
1043 /// use http_kit::Body;
1044 ///
1045 /// let mut body = Body::from_bytes("data");
1046 /// assert!(!body.is_frozen());
1047 ///
1048 /// let _taken = body.take().unwrap();
1049 /// assert!(body.is_frozen());
1050 ///
1051 /// let frozen = Body::frozen();
1052 /// assert!(frozen.is_frozen());
1053 /// ```
1054 pub const fn is_frozen(&self) -> bool {
1055 matches!(self.inner, BodyInner::Freeze)
1056 }
1057
1058 /// Freezes the body, making it unusable and dropping its content.
1059 ///
1060 /// This method converts the body to a frozen state, discarding any data
1061 /// it contained. After freezing, the body cannot be used for any operations
1062 /// and will return errors if accessed.
1063 ///
1064 /// # Examples
1065 ///
1066 /// ```rust
1067 /// use http_kit::Body;
1068 ///
1069 /// let mut body = Body::from_bytes("Hello, world!");
1070 /// body.freeze();
1071 ///
1072 /// assert!(body.is_frozen());
1073 /// // Any further operations on `body` will fail
1074 /// ```
1075 pub fn freeze(&mut self) {
1076 self.replace(Self::frozen());
1077 }
1078}
1079
1080impl Default for Body {
1081 fn default() -> Self {
1082 Self::empty()
1083 }
1084}
1085
1086impl Stream for Body {
1087 type Item = Result<Bytes, Error>;
1088
1089 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1090 match &mut self.inner {
1091 BodyInner::Once(bytes) => {
1092 if bytes.is_empty() {
1093 Poll::Ready(None)
1094 } else {
1095 Poll::Ready(Some(Ok(take(bytes))))
1096 }
1097 }
1098 BodyInner::Reader { reader, length } => {
1099 let data = ready!(reader.as_mut().poll_fill_buf(cx))?;
1100 if data.is_empty() {
1101 return Poll::Ready(None);
1102 }
1103 let data = Bytes::copy_from_slice(data);
1104 reader.as_mut().consume(data.len());
1105 if let Some(known_length) = length {
1106 *known_length = known_length.saturating_sub(data.len());
1107 }
1108 Poll::Ready(Some(Ok(data)))
1109 }
1110 BodyInner::HttpBody(stream) => stream
1111 .as_mut()
1112 .poll_frame(cx)
1113 .map_ok(|frame| frame.into_data().unwrap_or_default()),
1114 BodyInner::Freeze => Poll::Ready(Some(Err(Error::BodyFrozen))),
1115 }
1116 }
1117
1118 fn size_hint(&self) -> (usize, Option<usize>) {
1119 match &self.inner {
1120 BodyInner::Once(bytes) => (bytes.len(), Some(bytes.len())),
1121 BodyInner::Reader { length, .. } => (0, *length),
1122 BodyInner::HttpBody(body) => {
1123 let hint = body.size_hint();
1124 (hint.lower() as usize, hint.upper().map(|u| u as usize))
1125 }
1126 BodyInner::Freeze => (0, None),
1127 }
1128 }
1129}
1130
1131impl http_body::Body for Body {
1132 type Data = Bytes;
1133
1134 type Error = Error;
1135
1136 fn poll_frame(
1137 self: Pin<&mut Self>,
1138 cx: &mut Context<'_>,
1139 ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
1140 self.poll_next(cx)
1141 .map(|opt| opt.map(|result| result.map(http_body::Frame::data)))
1142 .map_err(Error::from)
1143 }
1144}
1145
1146#[cfg(test)]
1147mod tests {
1148 use super::*;
1149 use alloc::string::ToString;
1150 use alloc::vec;
1151 use futures_lite::{stream, StreamExt};
1152
1153 #[tokio::test]
1154 async fn basic_body_operations() {
1155 let empty = Body::empty();
1156 assert_eq!(empty.len(), Some(0));
1157 assert_eq!(empty.is_empty(), Some(true));
1158 assert!(!empty.is_frozen());
1159
1160 let text_body = Body::from_bytes("Hello, World!");
1161 assert_eq!(text_body.len(), Some(13));
1162 assert_eq!(text_body.is_empty(), Some(false));
1163
1164 let result = text_body.into_bytes().await.unwrap();
1165 assert_eq!(result.as_ref(), b"Hello, World!");
1166 }
1167
1168 #[tokio::test]
1169 async fn body_freeze_and_take() {
1170 let mut body = Body::from_bytes("test data");
1171 assert!(!body.is_frozen());
1172
1173 let taken = Body::take(&mut body).unwrap();
1174 assert!(body.is_frozen());
1175
1176 let data = taken.into_bytes().await.unwrap();
1177 assert_eq!(data.as_ref(), b"test data");
1178
1179 let result = body.into_bytes().await;
1180 assert!(result.is_err());
1181 }
1182
1183 #[tokio::test]
1184 async fn body_conversions() {
1185 let vec_data = vec![1, 2, 3, 4, 5];
1186 let body = Body::from(vec_data.clone());
1187 let result = body.into_bytes().await.unwrap();
1188 assert_eq!(result.as_ref(), vec_data.as_slice());
1189
1190 let str_data = "string conversion test";
1191 let body = Body::from(str_data);
1192 let result = body.into_string().await.unwrap();
1193 assert_eq!(result.as_str(), str_data);
1194
1195 let string_data = "owned string test".to_string();
1196 let expected = string_data.clone();
1197 let body = Body::from(string_data);
1198 let result = body.into_string().await.unwrap();
1199 assert_eq!(result.as_str(), expected);
1200
1201 let slice_data: &[u8] = &[6, 7, 8, 9, 10];
1202 let body = Body::from(slice_data);
1203 let result = body.into_bytes().await.unwrap();
1204 assert_eq!(result.as_ref(), slice_data);
1205 }
1206
1207 #[tokio::test]
1208 async fn body_stream_yields_bytes() {
1209 let body = Body::from_bytes("streaming test data");
1210 let mut chunks = Vec::new();
1211
1212 let mut stream = body;
1213 while let Some(chunk_result) = stream.next().await {
1214 let chunk = chunk_result.unwrap();
1215 chunks.push(chunk);
1216 }
1217
1218 assert_eq!(chunks.len(), 1);
1219 assert_eq!(chunks[0].as_ref(), b"streaming test data");
1220 }
1221
1222 #[cfg(feature = "json")]
1223 #[tokio::test]
1224 async fn json_roundtrip() {
1225 use alloc::string::{String, ToString};
1226 use serde::{Deserialize, Serialize};
1227
1228 #[derive(Serialize, Deserialize, PartialEq, Debug)]
1229 struct TestData {
1230 message: String,
1231 count: u32,
1232 }
1233
1234 let data = TestData {
1235 message: "JSON test".to_string(),
1236 count: 42,
1237 };
1238
1239 let body = Body::from_json(&data).unwrap();
1240 let json_str = body.into_string().await.unwrap();
1241 assert!(json_str.contains("JSON test"));
1242 assert!(json_str.contains("42"));
1243
1244 let mut body = Body::from_json(&data).unwrap();
1245 let parsed: TestData = body.into_json().await.unwrap();
1246 assert_eq!(parsed, data);
1247 }
1248
1249 #[cfg(feature = "form")]
1250 #[tokio::test]
1251 async fn form_roundtrip() {
1252 use alloc::string::String;
1253 use serde::{Deserialize, Serialize};
1254
1255 #[derive(Serialize, Deserialize, PartialEq, Debug)]
1256 struct FormData {
1257 name: String,
1258 age: u32,
1259 }
1260
1261 let data = FormData {
1262 name: "Alice".to_string(),
1263 age: 30,
1264 };
1265
1266 let body = Body::from_form(&data).unwrap();
1267 let form_str = body.into_string().await.unwrap();
1268 assert!(form_str.contains("name=Alice"));
1269 assert!(form_str.contains("age=30"));
1270
1271 let mut body = Body::from_form(&data).unwrap();
1272 let parsed: FormData = body.into_form().await.unwrap();
1273 assert_eq!(parsed, data);
1274 }
1275
1276 #[tokio::test]
1277 async fn reader_does_not_hang() {
1278 use futures_lite::io::{BufReader, Cursor};
1279
1280 let data = "This test ensures the reader doesn't create infinite loops";
1281 let cursor = Cursor::new(data.as_bytes().to_vec());
1282 let reader = BufReader::new(cursor);
1283
1284 let body = Body::from_reader(reader, data.len());
1285
1286 let result = body.into_bytes().await.unwrap();
1287 assert_eq!(result.as_ref(), data.as_bytes());
1288 }
1289
1290 #[tokio::test]
1291 async fn sse_body_creation_sets_mime() {
1292 let events = stream::iter(vec![
1293 Ok::<_, Box<dyn core::error::Error + Send + Sync>>(
1294 crate::sse::Event::from_data("test data").with_id("1"),
1295 ),
1296 Ok(crate::sse::Event::from_data("more data").with_id("2")),
1297 ]);
1298
1299 let body = Body::from_sse(events);
1300 assert_eq!(
1301 body.mime().as_ref().map(|m| m.as_ref()),
1302 Some("text/event-stream")
1303 );
1304 }
1305
1306 #[tokio::test]
1307 async fn body_as_str_and_bytes() {
1308 let mut body = Body::from_bytes("test string");
1309
1310 let bytes_ref = body.as_bytes().await.unwrap();
1311 assert_eq!(bytes_ref, b"test string");
1312
1313 let bytes_ref2 = body.as_bytes().await.unwrap();
1314 assert_eq!(bytes_ref2, b"test string");
1315
1316 let mut body2 = Body::from_bytes("test string");
1317 let str_ref = body2.as_str().await.unwrap();
1318 assert_eq!(str_ref, "test string");
1319
1320 let mut invalid_body = Body::from_bytes(vec![0xFF, 0xFE, 0xFD]);
1321 let result = invalid_body.as_str().await;
1322 assert!(result.is_err());
1323 }
1324
1325 #[tokio::test]
1326 async fn body_replace_and_swap() {
1327 let mut body = Body::from_bytes("original");
1328 let old_body = body.replace(Body::from_bytes("replacement"));
1329
1330 let new_data = body.into_bytes().await.unwrap();
1331 let old_data = old_body.into_bytes().await.unwrap();
1332
1333 assert_eq!(new_data.as_ref(), b"replacement");
1334 assert_eq!(old_data.as_ref(), b"original");
1335
1336 let mut body1 = Body::from_bytes("first");
1337 let mut body2 = Body::from_bytes("second");
1338
1339 Body::swap(&mut body1, &mut body2).unwrap();
1340
1341 let data1 = body1.into_bytes().await.unwrap();
1342 let data2 = body2.into_bytes().await.unwrap();
1343
1344 assert_eq!(data1.as_ref(), b"second");
1345 assert_eq!(data2.as_ref(), b"first");
1346
1347 let mut frozen_body = Body::frozen();
1348 let mut normal_body = Body::from_bytes("test");
1349 let result = Body::swap(&mut frozen_body, &mut normal_body);
1350 assert!(result.is_err());
1351 }
1352
1353 #[tokio::test]
1354 async fn body_freeze() {
1355 let mut body = Body::from_bytes("test");
1356 assert!(!body.is_frozen());
1357
1358 body.freeze();
1359 assert!(body.is_frozen());
1360
1361 let result = body.into_bytes().await;
1362 assert!(result.is_err());
1363 }
1364
1365 #[tokio::test]
1366 async fn mime_types() {
1367 let empty = Body::empty();
1368 assert!(empty.mime().is_none());
1369
1370 #[cfg(feature = "json")]
1371 {
1372 use serde::Serialize;
1373 #[derive(Serialize)]
1374 struct Data {
1375 val: i32,
1376 }
1377 let body = Body::from_json(&Data { val: 1 }).unwrap();
1378 assert_eq!(body.mime().unwrap().as_ref(), "application/json");
1379 }
1380
1381 #[cfg(feature = "form")]
1382 {
1383 use serde::Serialize;
1384 #[derive(Serialize)]
1385 struct Data {
1386 val: i32,
1387 }
1388 let body = Body::from_form(&Data { val: 1 }).unwrap();
1389 assert_eq!(
1390 body.mime().unwrap().as_ref(),
1391 "application/x-www-form-urlencoded"
1392 );
1393 }
1394 }
1395
1396 #[cfg(all(feature = "fs", feature = "std"))]
1397 #[tokio::test]
1398 async fn file_body_with_mime() {
1399 use std::io::Write;
1400
1401 let dir = std::env::temp_dir();
1402 let file_path = dir.join("test_mime.html");
1403 let mut file = std::fs::File::create(&file_path).unwrap();
1404 file.write_all(b"<html></html>").unwrap();
1405
1406 let body = Body::from_file(&file_path).await.unwrap();
1407
1408 assert_eq!(body.mime().unwrap().as_ref(), "text/html");
1409
1410 let _ = std::fs::remove_file(file_path);
1411 }
1412}