use bytes::Bytes;
use pingora_cache::lock::{CacheKeyLockImpl, LockStatus, WritePermit};
use pingora_cache::CacheKey;
use pingora_core::protocols::http::subrequest::server::{
HttpSession as SessionSubrequest, SubrequestHandle,
};
use std::any::Any;
pub mod pipe;
struct LockCtx {
write_permit: WritePermit,
cache_lock: &'static CacheKeyLockImpl,
key: CacheKey,
}
pub(crate) struct InputBodyReader(std::vec::IntoIter<Bytes>);
impl InputBodyReader {
pub fn read_body(&mut self) -> Option<Bytes> {
self.0.next()
}
}
pub type UserCtx = Box<dyn Any + Sync + Send>;
#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
pub enum BodyMode {
#[default]
NoBody,
ExpectBody,
}
#[derive(Default)]
pub struct CtxBuilder {
lock: Option<LockCtx>,
body_mode: BodyMode,
user_ctx: Option<UserCtx>,
}
impl CtxBuilder {
pub fn new() -> Self {
Self {
lock: None,
body_mode: BodyMode::NoBody,
user_ctx: None,
}
}
pub fn cache_write_lock(
mut self,
cache_lock: &'static CacheKeyLockImpl,
key: CacheKey,
write_permit: WritePermit,
) -> Self {
self.lock = Some(LockCtx {
cache_lock,
key,
write_permit,
});
self
}
pub fn user_ctx(mut self, user_ctx: UserCtx) -> Self {
self.user_ctx = Some(user_ctx);
self
}
pub fn body_mode(mut self, body_mode: BodyMode) -> Self {
self.body_mode = body_mode;
self
}
pub fn build(self) -> Ctx {
Ctx {
lock: self.lock,
body_mode: self.body_mode,
user_ctx: self.user_ctx,
}
}
}
pub struct Ctx {
body_mode: BodyMode,
lock: Option<LockCtx>,
user_ctx: Option<UserCtx>,
}
impl Ctx {
pub fn builder() -> CtxBuilder {
CtxBuilder::new()
}
pub fn user_ctx(&self) -> Option<&UserCtx> {
self.user_ctx.as_ref()
}
pub fn user_ctx_mut(&mut self) -> Option<&mut UserCtx> {
self.user_ctx.as_mut()
}
pub fn release_write_lock(&mut self) {
if let Some(lock) = self.lock.take() {
lock.cache_lock
.release(&lock.key, lock.write_permit, LockStatus::TransientError);
}
}
pub fn take_write_lock(&mut self) -> Option<WritePermit> {
self.lock.take().map(|lock| lock.write_permit)
}
pub fn body_mode(&self) -> BodyMode {
self.body_mode
}
}
use crate::HttpSession;
pub(crate) fn create_session(parsed_session: &HttpSession) -> (HttpSession, SubrequestHandle) {
let (session, handle) = SessionSubrequest::new_from_session(parsed_session);
(HttpSession::new_subrequest(session), handle)
}
#[tokio::test]
async fn test_dummy_request() {
use tokio_test::io::Builder;
let input = b"GET / HTTP/1.1\r\n\r\n";
let mock_io = Builder::new().read(&input[..]).build();
let mut req = HttpSession::new_http1(Box::new(mock_io));
req.read_request().await.unwrap();
assert_eq!(input.as_slice(), req.to_h1_raw());
let (mut subreq, _handle) = create_session(&req);
subreq.read_request().await.unwrap();
assert_eq!(input.as_slice(), subreq.to_h1_raw());
}