use std::pin::Pin;
use std::task::{Context, Poll};
use pin_project_lite::pin_project;
use rama_core::bytes::{Buf, Bytes};
use rama_core::error::BoxError;
use rama_core::futures::ready;
use rama_json::capture::{CaptureHandler, JsonCapturer};
use rama_json::path::JsonPath;
use rama_json::tokenizer::DEFAULT_MAX_BUFFERED_BYTES;
use crate::body::{Frame, SizeHint, StreamingBody};
type OnEnd<H> = Box<dyn FnOnce(H) + Send + Sync>;
pin_project! {
pub struct JsonCaptureBody<B, H> {
#[pin]
inner: B,
capturer: Option<JsonCapturer<H>>,
on_end: Option<OnEnd<H>>,
done: bool,
}
}
impl<B, H> JsonCaptureBody<B, H>
where
H: CaptureHandler,
{
pub fn new(
inner: B,
selectors: impl IntoIterator<Item = JsonPath>,
max_capture_bytes: usize,
handler: H,
) -> Self {
Self::with_max_buffered_bytes(
inner,
selectors,
max_capture_bytes,
DEFAULT_MAX_BUFFERED_BYTES,
handler,
)
}
pub fn with_max_buffered_bytes(
inner: B,
selectors: impl IntoIterator<Item = JsonPath>,
max_capture_bytes: usize,
max_buffered_bytes: usize,
handler: H,
) -> Self {
Self {
inner,
capturer: Some(JsonCapturer::with_max_buffered_bytes(
selectors,
max_capture_bytes,
max_buffered_bytes,
handler,
)),
on_end: None,
done: false,
}
}
}
impl<B, H> JsonCaptureBody<B, H> {
pub fn passthrough(inner: B) -> Self {
Self {
inner,
capturer: None,
on_end: None,
done: false,
}
}
#[must_use]
pub fn on_end<F>(mut self, on_end: F) -> Self
where
F: FnOnce(H) + Send + Sync + 'static,
{
self.on_end = Some(Box::new(on_end));
self
}
}
fn fire_on_end<H: CaptureHandler>(
capturer: &mut Option<JsonCapturer<H>>,
on_end: &mut Option<OnEnd<H>>,
) {
if let (Some(capturer), Some(on_end)) = (capturer.take(), on_end.take()) {
on_end(capturer.into_handler());
}
}
impl<B, H> StreamingBody for JsonCaptureBody<B, H>
where
B: StreamingBody<Error: Into<BoxError>>,
H: CaptureHandler,
{
type Data = Bytes;
type Error = BoxError;
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let mut this = self.project();
if *this.done {
return Poll::Ready(None);
}
let Some(capturer) = this.capturer.as_mut() else {
return match ready!(this.inner.as_mut().poll_frame(cx)) {
Some(Ok(frame)) => Poll::Ready(Some(Ok(normalize_frame(frame)))),
Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
None => {
*this.done = true;
Poll::Ready(None)
}
};
};
match ready!(this.inner.as_mut().poll_frame(cx)) {
Some(Ok(frame)) => match frame.into_data() {
Ok(mut data) => {
let bytes = data.copy_to_bytes(data.remaining());
if let Err(err) = capturer.write(&bytes) {
return Poll::Ready(Some(Err(err.into())));
}
Poll::Ready(Some(Ok(Frame::data(bytes))))
}
Err(frame) => match frame.into_trailers() {
Ok(trailers) => {
if let Err(err) = capturer.end() {
return Poll::Ready(Some(Err(err.into())));
}
fire_on_end(this.capturer, this.on_end);
*this.done = true;
Poll::Ready(Some(Ok(Frame::trailers(trailers))))
}
Err(_) => Poll::Ready(Some(Ok(Frame::data(Bytes::new())))),
},
},
Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
None => {
*this.done = true;
if let Err(err) = capturer.end() {
return Poll::Ready(Some(Err(err.into())));
}
fire_on_end(this.capturer, this.on_end);
Poll::Ready(None)
}
}
}
fn size_hint(&self) -> SizeHint {
self.inner.size_hint()
}
}
fn normalize_frame<D: Buf>(frame: Frame<D>) -> Frame<Bytes> {
match frame.into_data() {
Ok(mut data) => Frame::data(data.copy_to_bytes(data.remaining())),
Err(frame) => match frame.into_trailers() {
Ok(trailers) => Frame::trailers(trailers),
Err(_) => Frame::data(Bytes::new()),
},
}
}