rama_http/layer/retry/
body.rs

1use bytes::Bytes;
2
3#[derive(Debug, Clone)]
4/// A body that can be clone and used for requests that have to be rertried.
5pub struct RetryBody {
6    bytes: Option<Bytes>,
7}
8
9impl RetryBody {
10    pub(crate) fn new(bytes: Bytes) -> Self {
11        RetryBody { bytes: Some(bytes) }
12    }
13
14    #[cfg(test)]
15    pub(crate) fn empty() -> Self {
16        RetryBody { bytes: None }
17    }
18
19    /// Turn this body into bytes.
20    pub fn into_bytes(self) -> Option<Bytes> {
21        self.bytes
22    }
23}
24
25impl crate::dep::http_body::Body for RetryBody {
26    type Data = Bytes;
27    type Error = rama_core::error::BoxError;
28
29    fn poll_frame(
30        mut self: std::pin::Pin<&mut Self>,
31        _cx: &mut std::task::Context<'_>,
32    ) -> std::task::Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
33        std::task::Poll::Ready(
34            self.bytes
35                .take()
36                .map(|bytes| Ok(http_body::Frame::data(bytes))),
37        )
38    }
39
40    fn is_end_stream(&self) -> bool {
41        self.bytes.is_none()
42    }
43
44    fn size_hint(&self) -> http_body::SizeHint {
45        http_body::SizeHint::with_exact(
46            self.bytes
47                .as_ref()
48                .map(|b| b.len() as u64)
49                .unwrap_or_default(),
50        )
51    }
52}
53
54impl From<RetryBody> for crate::Body {
55    fn from(body: RetryBody) -> Self {
56        match body.bytes {
57            Some(bytes) => bytes.into(),
58            None => crate::Body::empty(),
59        }
60    }
61}
62
63#[cfg(test)]
64mod tests {
65    use super::*;
66    use crate::BodyExtractExt;
67
68    #[tokio::test]
69    async fn consume_retry_body() {
70        let body = RetryBody::new(Bytes::from("hello"));
71        let s = body.try_into_string().await.unwrap();
72        assert_eq!(s, "hello");
73    }
74}