shared_http_body/
lib.rs

1//! A library for creating shareable HTTP bodies that can be cloned and consumed by multiple tasks.
2//!
3//! [`SharedBody`] wraps any [`http_body::Body`] to make it cloneable. All clones share the same underlying
4//! body state, so clones created at the same time will see the same frames, while clones created
5//! after partial consumption will only see the remaining frames.
6//!
7//! # Examples
8//!
9//! ```
10//! use shared_http_body::SharedBody;
11//! use http_body_util::{BodyExt, StreamBody};
12//! use http_body::Frame;
13//! use bytes::Bytes;
14//! use futures_util::stream;
15//!
16//! # tokio_test::block_on(async {
17//! // Create a body from a stream of frames
18//! let chunks = vec!["hello", "world"];
19//! let stream = stream::iter(chunks.into_iter().map(|s| Ok::<_, std::convert::Infallible>(Frame::data(Bytes::from(s)))));
20//! let body = StreamBody::new(stream);
21//! let shared_body = SharedBody::new(body);
22//!
23//! // Clone the body for multiple consumers
24//! let consumer1 = shared_body.clone();
25//! let consumer2 = shared_body.clone();
26//!
27//! // Both consumers will receive all frames
28//! let result1 = consumer1.collect().await.unwrap().to_bytes();
29//! let result2 = consumer2.collect().await.unwrap().to_bytes();
30//!
31//! assert_eq!(result1, Bytes::from("helloworld"));
32//! assert_eq!(result2, Bytes::from("helloworld"));
33//! # });
34//! ```
35//!
36//! # Requirements
37//!
38//! The underlying [`http_body::Body`] type must be [`Unpin`], and both the body's data (`Body::Data`)
39//! and error types (`Body::Error`) must implement [`Clone`].
40//!
41//! # Behavior
42//!
43//! When you clone a [`SharedBody`], the clone will start from the current position
44//! of the body being cloned, not from the beginning of the original data. Each
45//! `SharedBody` maintains its own independent position. This means:
46//!
47//! - Clones created from the same body at the same time will see the same frames
48//! - Clones created after consumption will only see frames remaining from that body's position
49//! - Each clone can be consumed independently and can itself be cloned from its current position
50//!
51//! For example, with a body containing 4 frames:
52//! ```
53//! use shared_http_body::SharedBody;
54//! use http_body_util::{BodyExt, StreamBody};
55//! use http_body::Frame;
56//! use bytes::Bytes;
57//! use futures_util::stream;
58//!
59//! # tokio_test::block_on(async {
60//! let chunks = vec!["frame1", "frame2", "frame3", "frame4"];
61//! let stream = stream::iter(chunks.into_iter().map(|s| Ok::<_, std::convert::Infallible>(Frame::data(Bytes::from(s)))));
62//! let body = StreamBody::new(stream);
63//! let mut original = SharedBody::new(body);
64//!
65//! // Consume first frame
66//! use http_body::Body;
67//! let _ = std::pin::Pin::new(&mut original).poll_frame(&mut std::task::Context::from_waker(&futures_util::task::noop_waker()));
68//! // Or use the frame method from BodyExt
69//! let _ = http_body_util::BodyExt::frame(&mut original).await;
70//!
71//! // Clone after consuming 1 frame - clone1 will have 3 remaining frames
72//! let clone1 = original.clone();
73//!
74//! # });
75//! ```
76//!
77//! # Thread Safety
78//!
79//! `SharedBody` is both [`Send`] and [`Sync`] when the underlying body and its data/error types
80//! are `Send` and `Sync`. This means cloned bodies can be safely moved across threads
81//! and shared between tasks running on different threads.
82//!
83//! ```
84//! use shared_http_body::SharedBody;
85//! use http_body_util::{BodyExt, StreamBody};
86//! use http_body::Frame;
87//! use bytes::Bytes;
88//! use futures_util::stream;
89//! use tokio::task;
90//!
91//! # tokio_test::block_on(async {
92//! let data = vec![Bytes::from("data1"), Bytes::from("data2"), Bytes::from("data3")];
93//! let stream = stream::iter(data.clone().into_iter().map(|b| Ok::<_, std::convert::Infallible>(Frame::data(b))));
94//! let body = StreamBody::new(stream);
95//! let shared_body = SharedBody::new(body);
96//!
97//! // Clone and move to different threads
98//! let body1 = shared_body.clone();
99//! let body2 = shared_body.clone();
100//!
101//! let handle1 = task::spawn(async move {
102//!     body1.collect().await.unwrap().to_bytes()
103//! });
104//!
105//! let handle2 = task::spawn(async move {
106//!     body2.collect().await.unwrap().to_bytes()
107//! });
108//!
109//! let (result1, result2) = tokio::join!(handle1, handle2);
110//! assert_eq!(result1.unwrap(), Bytes::from("data1data2data3"));
111//! assert_eq!(result2.unwrap(), Bytes::from("data1data2data3"));
112//! # });
113//! ```
114//!
115//! # Use Cases
116//!
117//! `SharedBody` is particularly useful for:
118//!
119//! - **Dark Forwarding**: Clone the body to send it to a secondary destination (e.g., testing in production, shadow traffic)
120//! - **Request Retries**: Keep a clone of the request body for retry attempts on failure
121//! - **Request/Response Logging**: Clone the body to log it while still forwarding to the handler
122//!
123
124#![cfg_attr(docsrs, feature(doc_cfg))]
125
126mod clonable_frame;
127mod ext;
128mod inner;
129mod shared_body;
130
131pub use crate::shared_body::SharedBody;
132pub use ext::SharedBodyExt;
133
134#[cfg_attr(docsrs, doc(cfg(feature = "stats")))]
135#[cfg(feature = "stats")]
136pub mod stats;
137
138#[cfg(doc)]
139use http_body::Body;