spacegate_kernel/body/
observer.rs

1use std::task::Poll;
2
3use crate::BoxError;
4use hyper::body::{Body, Bytes, Frame};
5
6use super::SgBody;
7
8pub trait State: Sized + Send + Sync + 'static {
9    fn update_bytes(&mut self, data: &Bytes);
10    fn finish(self) {}
11    fn error(self, _e: &BoxError) {}
12}
13
14pin_project_lite::pin_project! {
15    pub struct Observer<S> {
16        state: Option<S>,
17        #[pin]
18        inner: SgBody,
19    }
20}
21
22impl<S: State> Observer<S> {
23    pub fn new(state: S, inner: SgBody) -> Self {
24        Self { state: Some(state), inner }
25    }
26    pub fn to_sg_body(self) -> SgBody {
27        SgBody::new(self)
28    }
29}
30impl<S> Body for Observer<S>
31where
32    S: State,
33{
34    type Data = Bytes;
35    type Error = BoxError;
36    fn poll_frame(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
37        let this = self.project();
38        let poll_result = this.inner.poll_frame(cx);
39        if let Poll::Ready(ref ready) = poll_result {
40            match ready {
41                Some(Ok(ref frame)) => {
42                    if let Some(data) = frame.data_ref() {
43                        if let Some(s) = this.state.as_mut() {
44                            s.update_bytes(data)
45                        }
46                    }
47                }
48                Some(Err(ref e)) => {
49                    if let Some(s) = this.state.take() {
50                        s.error(e)
51                    }
52                }
53                None => {
54                    if let Some(s) = this.state.take() {
55                        s.finish()
56                    }
57                }
58            }
59        }
60        poll_result
61    }
62    fn is_end_stream(&self) -> bool {
63        self.inner.is_end_stream()
64    }
65    fn size_hint(&self) -> hyper::body::SizeHint {
66        self.inner.size_hint()
67    }
68}