spacegate_kernel/
body.rs

1// a read only stream reader with some side effect.
2pub mod observer;
3use crate::BoxError;
4use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
5use hyper::body::{Body, Bytes};
6
7use crate::utils::never;
8
9/// # Clone
10/// Always clone after ensure the body is dumped
11#[derive(Debug)]
12pub struct SgBody {
13    pub(crate) body: BoxBody<Bytes, BoxError>,
14    pub(crate) dump: Option<Bytes>,
15}
16
17impl Default for SgBody {
18    fn default() -> Self {
19        Self::empty()
20    }
21}
22
23impl Body for SgBody {
24    type Data = Bytes;
25    type Error = BoxError;
26
27    fn poll_frame(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
28        let mut pinned = std::pin::pin!(&mut self.body);
29        pinned.as_mut().poll_frame(cx)
30    }
31
32    fn is_end_stream(&self) -> bool {
33        self.body.is_end_stream()
34    }
35
36    fn size_hint(&self) -> hyper::body::SizeHint {
37        self.body.size_hint()
38    }
39}
40
41impl SgBody {
42    pub fn new<E>(body: impl Body<Data = Bytes, Error = E> + Send + Sync + 'static) -> Self
43    where
44        E: Into<BoxError> + 'static,
45    {
46        Self {
47            body: BoxBody::new(body.map_err(E::into)),
48            dump: None,
49        }
50    }
51    pub fn empty() -> Self {
52        Self {
53            body: BoxBody::new(Empty::new().map_err(never)),
54            dump: None,
55        }
56    }
57    pub fn full(data: impl Into<Bytes>) -> Self {
58        let bytes = data.into();
59        Self {
60            body: BoxBody::new(Full::new(bytes.clone()).map_err(never)),
61            dump: Some(bytes),
62        }
63    }
64    pub fn is_dumped(&self) -> bool {
65        self.dump.is_some()
66    }
67    /// # Errors
68    /// fail to collect body chunks
69    pub async fn dump(self) -> Result<Self, BoxError> {
70        let bytes = self.body.collect().await?.to_bytes();
71        Ok(Self {
72            body: BoxBody::new(Full::new(bytes.clone()).map_err(never)),
73            dump: Some(bytes),
74        })
75    }
76    pub fn dump_clone(&self) -> Option<Self> {
77        self.dump.as_ref().map(|bytes| Self {
78            body: BoxBody::new(Full::new(bytes.clone()).map_err(never)),
79            dump: Some(bytes.clone()),
80        })
81    }
82    pub fn get_dumped(&self) -> Option<&Bytes> {
83        self.dump.as_ref()
84    }
85}
86
87impl Clone for SgBody {
88    fn clone(&self) -> Self {
89        if let Some(dump) = self.dump_clone() {
90            dump
91        } else {
92            panic!("SgBody can't be cloned before dump")
93        }
94    }
95}