spacegate-kernel 0.2.0-alpha.4

A library-first, lightweight, high-performance, cloud-native supported API gateway
Documentation
use std::task::Poll;

use crate::BoxError;
use hyper::body::{Body, Bytes, Frame};

use super::SgBody;

pub trait State: Sized + Send + Sync + 'static {
    fn update_bytes(&mut self, data: &Bytes);
    fn finish(self) {}
    fn error(self, _e: &BoxError) {}
}

pin_project_lite::pin_project! {
    pub struct Observer<S> {
        state: Option<S>,
        #[pin]
        inner: SgBody,
    }
}

impl<S: State> Observer<S> {
    pub fn new(state: S, inner: SgBody) -> Self {
        Self { state: Some(state), inner }
    }
    pub fn to_sg_body(self) -> SgBody {
        SgBody::new(self)
    }
}
impl<S> Body for Observer<S>
where
    S: State,
{
    type Data = Bytes;
    type Error = BoxError;
    fn poll_frame(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
        let this = self.project();
        let poll_result = this.inner.poll_frame(cx);
        if let Poll::Ready(ref ready) = poll_result {
            match ready {
                Some(Ok(ref frame)) => {
                    if let Some(data) = frame.data_ref() {
                        if let Some(s) = this.state.as_mut() {
                            s.update_bytes(data)
                        }
                    }
                }
                Some(Err(ref e)) => {
                    if let Some(s) = this.state.take() {
                        s.error(e)
                    }
                }
                None => {
                    if let Some(s) = this.state.take() {
                        s.finish()
                    }
                }
            }
        }
        poll_result
    }
    fn is_end_stream(&self) -> bool {
        self.inner.is_end_stream()
    }
    fn size_hint(&self) -> hyper::body::SizeHint {
        self.inner.size_hint()
    }
}