use std::sync::atomic::{AtomicUsize, Ordering};
use bytes::Bytes;
use fancy_log::{LogLevel, log};
use http::{HeaderMap, Response};
use http_body::Body;
use http_body_util::BodyExt;
use hyper::upgrade::OnUpgrade;
use tokio::sync::oneshot;
use crate::common::{
config::env_loader,
sys::lifecycle::{Error, Result},
};
use crate::layers::l7::{
http::{protocol_data::HttpProtocolData, wrapper::VaneBody},
protocol_data::ProtocolData,
};
use crate::resources::kv::KvStore;
pub static GLOBAL_L7_BUFFERED_BYTES: AtomicUsize = AtomicUsize::new(0);
pub static CURRENT_MEMORY_LIMIT: AtomicUsize = AtomicUsize::new(536_870_912);
pub fn update_memory_limit(new_limit: usize) {
CURRENT_MEMORY_LIMIT.store(new_limit, Ordering::Relaxed);
}
pub fn try_reserve_buffer_memory(amount: usize) -> bool {
let limit = CURRENT_MEMORY_LIMIT.load(Ordering::Relaxed);
let current = GLOBAL_L7_BUFFERED_BYTES.load(Ordering::Relaxed);
if current + amount > limit {
log(
LogLevel::Warn,
&format!(
"🛡 Security: L7 Global Buffer Limit reached! Denying allocation of {amount} bytes (Used: {current}/{limit})"
),
);
return false;
}
GLOBAL_L7_BUFFERED_BYTES.fetch_add(amount, Ordering::Relaxed);
true
}
pub fn release_buffer_memory(amount: usize) {
GLOBAL_L7_BUFFERED_BYTES.fetch_sub(amount, Ordering::Relaxed);
}
#[derive(Debug)]
pub struct BufferGuard {
size: usize,
}
impl BufferGuard {
#[must_use]
pub fn new(size: usize) -> Self {
Self { size }
}
}
impl Drop for BufferGuard {
fn drop(&mut self) {
if self.size > 0 {
release_buffer_memory(self.size);
}
}
}
#[derive(Debug)]
pub enum PayloadState {
Http(VaneBody),
Generic,
Buffered(Bytes, BufferGuard),
Empty,
}
impl PayloadState {
pub fn new_buffered(bytes: Bytes) -> Result<Self> {
let len = bytes.len();
if !try_reserve_buffer_memory(len) {
return Err(Error::System(
"Global L7 memory limit exceeded. Buffering denied.".to_owned(),
));
}
Ok(Self::Buffered(bytes, BufferGuard::new(len)))
}
async fn force_buffer(&mut self) -> Result<&Bytes> {
let max_len_str = env_loader::get_env("L7_MAX_BUFFER_SIZE", "10485760".to_owned()); let max_len = max_len_str.parse::<usize>().unwrap_or(10485760);
let current_state = std::mem::replace(self, Self::Empty);
match current_state {
Self::Http(body) => {
let size_hint = body.size_hint().lower() as usize;
if size_hint > max_len {
*self = Self::Http(body); return Err(Error::System(format!(
"Payload size hint too large: {size_hint} > {max_len}"
)));
}
let collected = body
.collect()
.await
.map_err(|e| Error::System(format!("Failed to buffer Vane body: {e}")))?;
let bytes = collected.to_bytes();
let actual_len = bytes.len();
if actual_len > max_len {
return Err(Error::System(format!(
"Actual payload too large to buffer: {actual_len} > {max_len}"
)));
}
*self = Self::new_buffered(bytes)?;
}
Self::Buffered(bytes, guard) => {
*self = Self::Buffered(bytes, guard);
}
Self::Generic => {
*self = Self::new_buffered(Bytes::new())?;
}
Self::Empty => {
*self = Self::Empty;
}
}
match self {
Self::Buffered(b, _) => Ok(b),
_ => Err(Error::System(
"Internal state inconsistency: payload not buffered after force_buffer".to_owned(),
)),
}
}
}
pub struct Container {
pub kv: KvStore,
pub request_headers: HeaderMap,
pub request_body: PayloadState,
pub response_headers: HeaderMap,
pub response_body: PayloadState,
pub response_tx: Option<oneshot::Sender<Response<()>>>,
pub protocol_data: Option<Box<dyn ProtocolData>>,
}
impl Container {
pub fn new(
kv: KvStore,
request_headers: HeaderMap,
request_body: PayloadState,
response_headers: HeaderMap,
response_body: PayloadState,
response_tx: Option<oneshot::Sender<Response<()>>>,
) -> Self {
Self {
kv,
request_headers,
request_body,
response_headers,
response_body,
response_tx,
protocol_data: None,
}
}
pub fn new_with_http(
kv: KvStore,
request_headers: HeaderMap,
request_body: PayloadState,
response_headers: HeaderMap,
response_body: PayloadState,
response_tx: Option<oneshot::Sender<Response<()>>>,
) -> Self {
let mut container = Self::new(
kv,
request_headers,
request_body,
response_headers,
response_body,
response_tx,
);
container.protocol_data = Some(Box::new(HttpProtocolData::new()));
container
}
pub fn http_data(&self) -> Option<&HttpProtocolData> {
self
.protocol_data
.as_ref()?
.as_any()
.downcast_ref::<HttpProtocolData>()
}
pub fn http_data_mut(&mut self) -> Option<&mut HttpProtocolData> {
self
.protocol_data
.as_mut()?
.as_any_mut()
.downcast_mut::<HttpProtocolData>()
}
#[deprecated(
since = "0.6.9",
note = "Use container.http_data()?.client_upgrade to access this field"
)]
pub fn get_client_upgrade(&self) -> Option<&OnUpgrade> {
self.http_data()?.client_upgrade.as_ref()
}
#[deprecated(
since = "0.6.9",
note = "Use container.http_data_mut()?.client_upgrade = Some(...) to set this field"
)]
pub fn set_client_upgrade(&mut self, upgrade: OnUpgrade) {
if let Some(data) = self.http_data_mut() {
data.client_upgrade = Some(upgrade);
}
}
#[deprecated(
since = "0.6.9",
note = "Use container.http_data()?.upstream_upgrade to access this field"
)]
pub fn get_upstream_upgrade(&self) -> Option<&OnUpgrade> {
self.http_data()?.upstream_upgrade.as_ref()
}
#[deprecated(
since = "0.6.9",
note = "Use container.http_data_mut()?.upstream_upgrade = Some(...) to set this field"
)]
pub fn set_upstream_upgrade(&mut self, upgrade: OnUpgrade) {
if let Some(data) = self.http_data_mut() {
data.upstream_upgrade = Some(upgrade);
}
}
pub async fn force_buffer_request(&mut self) -> Result<&Bytes> {
self.request_body.force_buffer().await
}
pub async fn force_buffer_response(&mut self) -> Result<&Bytes> {
self.response_body.force_buffer().await
}
}