rama-http 0.3.0-rc1

rama http layers, services and other utilities
use super::JsonCaptureBody;
use crate::{
    Body,
    body::{
        Frame,
        util::{BodyExt, StreamBody},
    },
};
use parking_lot::Mutex;
use rama_core::bytes::Bytes;
use rama_core::futures::stream;
use rama_json::capture::{CaptureHandler, CaptureResult, CapturedValue, OwnedCapturedValue};
use rama_json::path::JsonPath;
use std::sync::Arc;

#[derive(Debug, Default)]
struct Recorder {
    values: Vec<OwnedCapturedValue>,
}

impl CaptureHandler for Recorder {
    fn handle_capture(&mut self, value: CapturedValue<'_>) -> CaptureResult {
        self.values.push(value.into_owned());
        Ok(())
    }
}

fn name_path() -> JsonPath {
    JsonPath::builder().member("name").build()
}

fn user_path() -> JsonPath {
    JsonPath::builder().member("user").build()
}

#[tokio::test]
async fn body_captures_selected_values_and_forwards_unchanged() {
    let chunks: Vec<Result<Bytes, std::io::Error>> = vec![
        Ok(Bytes::from_static(br#"{"name":"#)),
        Ok(Bytes::from_static(br#""Ada","ok":true}"#)),
    ];
    let captured = Arc::new(Mutex::new(Vec::new()));
    let sink = captured.clone();
    let body = JsonCaptureBody::new(
        Body::from_stream(stream::iter(chunks)),
        [name_path()],
        64,
        Recorder::default(),
    )
    .on_end(move |handler| {
        *sink.lock() = handler.values;
    });

    let out = body.collect().await.expect("collect").to_bytes();
    assert_eq!(&out[..], br#"{"name":"Ada","ok":true}"#);
    let captured = captured.lock();
    assert_eq!(captured.len(), 1);
    assert_eq!(captured[0].path().to_string(), "$.name");
    assert_eq!(captured[0].as_str().as_deref(), Some("Ada"));
}

#[tokio::test]
async fn body_captures_object_subtrees() {
    let captured = Arc::new(Mutex::new(Vec::new()));
    let sink = captured.clone();
    let body = JsonCaptureBody::new(
        Body::from(br#"{"user":{"id":7,"name":"Ada"},"ok":true}"#.as_slice()),
        [user_path()],
        128,
        Recorder::default(),
    )
    .on_end(move |handler| {
        *sink.lock() = handler.values;
    });

    let out = body.collect().await.expect("collect").to_bytes();
    assert_eq!(&out[..], br#"{"user":{"id":7,"name":"Ada"},"ok":true}"#);
    let captured = captured.lock();
    assert_eq!(captured.len(), 1);
    assert_eq!(captured[0].path().to_string(), "$.user");
    assert_eq!(
        captured[0].deserialize::<serde_json::Value>().unwrap(),
        serde_json::json!({"id": 7, "name": "Ada"})
    );
}

#[tokio::test]
async fn body_surfaces_capture_limit() {
    let body = JsonCaptureBody::new(
        Body::from(br#"{"user":{"id":7,"name":"Ada"}}"#.as_slice()),
        [user_path()],
        8,
        Recorder::default(),
    );

    body.collect()
        .await
        .expect_err("capture limit should surface as a body error");
}

#[tokio::test]
async fn body_delivers_trailers_after_capture() {
    let captured = Arc::new(Mutex::new(Vec::new()));
    let sink = captured.clone();
    let mut trailers = crate::HeaderMap::new();
    trailers.insert("x-done", "yes".parse().expect("header"));
    let frames = [
        Ok::<_, std::io::Error>(Frame::data(Bytes::from_static(br#"{"name":"Ada"}"#))),
        Ok(Frame::trailers(trailers)),
    ];
    let inner = StreamBody::new(stream::iter(frames));
    let mut body = JsonCaptureBody::new(inner, [name_path()], 64, Recorder::default()).on_end(
        move |handler| {
            *sink.lock() = handler.values;
        },
    );

    let first = body
        .frame()
        .await
        .expect("first frame")
        .expect("first frame ok")
        .into_data()
        .expect("data");
    assert_eq!(&first[..], br#"{"name":"Ada"}"#);

    let second = body
        .frame()
        .await
        .expect("second frame")
        .expect("second frame ok")
        .into_trailers()
        .expect("trailers");
    assert_eq!(second.get("x-done").unwrap(), "yes");
    assert!(body.frame().await.is_none());
    assert_eq!(captured.lock()[0].as_str().as_deref(), Some("Ada"));
}