spacegate_kernel/body/
observer.rs1use std::task::Poll;
2
3use crate::BoxError;
4use hyper::body::{Body, Bytes, Frame};
5
6use super::SgBody;
7
8pub trait State: Sized + Send + Sync + 'static {
9 fn update_bytes(&mut self, data: &Bytes);
10 fn finish(self) {}
11 fn error(self, _e: &BoxError) {}
12}
13
14pin_project_lite::pin_project! {
15 pub struct Observer<S> {
16 state: Option<S>,
17 #[pin]
18 inner: SgBody,
19 }
20}
21
22impl<S: State> Observer<S> {
23 pub fn new(state: S, inner: SgBody) -> Self {
24 Self { state: Some(state), inner }
25 }
26 pub fn to_sg_body(self) -> SgBody {
27 SgBody::new(self)
28 }
29}
30impl<S> Body for Observer<S>
31where
32 S: State,
33{
34 type Data = Bytes;
35 type Error = BoxError;
36 fn poll_frame(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
37 let this = self.project();
38 let poll_result = this.inner.poll_frame(cx);
39 if let Poll::Ready(ref ready) = poll_result {
40 match ready {
41 Some(Ok(ref frame)) => {
42 if let Some(data) = frame.data_ref() {
43 if let Some(s) = this.state.as_mut() {
44 s.update_bytes(data)
45 }
46 }
47 }
48 Some(Err(ref e)) => {
49 if let Some(s) = this.state.take() {
50 s.error(e)
51 }
52 }
53 None => {
54 if let Some(s) = this.state.take() {
55 s.finish()
56 }
57 }
58 }
59 }
60 poll_result
61 }
62 fn is_end_stream(&self) -> bool {
63 self.inner.is_end_stream()
64 }
65 fn size_hint(&self) -> hyper::body::SizeHint {
66 self.inner.size_hint()
67 }
68}