use std::cell::UnsafeCell;
use ntex_bytes::{BytesMut, buf::BufMut};
use ntex_service::cfg::{CfgContext, Configuration};
use ntex_util::{time::Millis, time::Seconds};
const DEFAULT_CACHE_SIZE: usize = 128;
const DEFAULT_HIGH: usize = 16 * 1024 - 24;
const DEFAULT_LOW: usize = 512 + 24;
const DEFAULT_HALF: usize = (16 * 1024 - 24) / 2;
thread_local! {
static CACHE: LocalCache = LocalCache::new();
}
#[derive(Debug)]
pub struct IoConfig {
connect_timeout: Millis,
keepalive_timeout: Seconds,
disconnect_timeout: Seconds,
frame_read_rate: Option<FrameReadRate>,
read_buf: BufConfig,
write_buf: BufConfig,
pub(crate) config: CfgContext,
}
impl Default for IoConfig {
fn default() -> Self {
IoConfig::new()
}
}
impl Configuration for IoConfig {
const NAME: &str = "IO Configuration";
fn ctx(&self) -> &CfgContext {
&self.config
}
fn set_ctx(&mut self, ctx: CfgContext) {
self.read_buf.idx = ctx.id();
self.write_buf.idx = ctx.id();
self.config = ctx;
}
}
#[derive(Copy, Clone, Debug)]
pub struct FrameReadRate {
pub timeout: Seconds,
pub max_timeout: Seconds,
pub rate: u32,
}
#[derive(Copy, Clone, Debug)]
pub struct BufConfig {
pub high: usize,
pub low: usize,
pub half: usize,
idx: usize,
first: bool,
cache_size: usize,
}
impl IoConfig {
#[inline]
#[must_use]
pub fn new() -> IoConfig {
let config = CfgContext::default();
let idx = config.id();
IoConfig {
config,
connect_timeout: Millis::ZERO,
keepalive_timeout: Seconds(0),
disconnect_timeout: Seconds(1),
frame_read_rate: None,
read_buf: BufConfig {
idx,
high: DEFAULT_HIGH,
low: DEFAULT_LOW,
half: DEFAULT_HALF,
first: true,
cache_size: DEFAULT_CACHE_SIZE,
},
write_buf: BufConfig {
idx,
high: DEFAULT_HIGH,
low: DEFAULT_LOW,
half: DEFAULT_HALF,
first: false,
cache_size: DEFAULT_CACHE_SIZE,
},
}
}
#[inline]
pub fn tag(&self) -> &str {
self.config.tag()
}
#[inline]
pub fn connect_timeout(&self) -> Millis {
self.connect_timeout
}
#[inline]
pub fn keepalive_timeout(&self) -> Seconds {
self.keepalive_timeout
}
#[inline]
pub fn disconnect_timeout(&self) -> Seconds {
self.disconnect_timeout
}
#[inline]
pub fn frame_read_rate(&self) -> Option<&FrameReadRate> {
self.frame_read_rate.as_ref()
}
#[inline]
pub fn read_buf(&self) -> &BufConfig {
&self.read_buf
}
#[inline]
pub fn write_buf(&self) -> &BufConfig {
&self.write_buf
}
#[must_use]
pub fn set_connect_timeout<T: Into<Millis>>(mut self, timeout: T) -> Self {
self.connect_timeout = timeout.into();
self
}
#[must_use]
pub fn set_keepalive_timeout<T: Into<Seconds>>(mut self, timeout: T) -> Self {
self.keepalive_timeout = timeout.into();
self
}
#[must_use]
pub fn set_disconnect_timeout<T: Into<Seconds>>(mut self, timeout: T) -> Self {
self.disconnect_timeout = timeout.into();
self
}
#[must_use]
pub fn set_frame_read_rate(
mut self,
timeout: Seconds,
max_timeout: Seconds,
rate: u32,
) -> Self {
self.frame_read_rate = Some(FrameReadRate {
timeout,
max_timeout,
rate,
});
self
}
#[must_use]
pub fn set_read_buf(
mut self,
high_watermark: usize,
low_watermark: usize,
cache_size: usize,
) -> Self {
self.read_buf.cache_size = cache_size;
self.read_buf.high = high_watermark;
self.read_buf.low = low_watermark;
self.read_buf.half = high_watermark >> 1;
self
}
#[must_use]
pub fn set_write_buf(
mut self,
high_watermark: usize,
low_watermark: usize,
cache_size: usize,
) -> Self {
self.write_buf.cache_size = cache_size;
self.write_buf.high = high_watermark;
self.write_buf.low = low_watermark;
self.write_buf.half = high_watermark >> 1;
self
}
}
impl BufConfig {
#[inline]
pub fn get(&self) -> BytesMut {
if let Some(mut buf) =
CACHE.with(|c| c.with(self.idx, self.first, |c: &mut Vec<_>| c.pop()))
{
buf.clear();
buf
} else {
BytesMut::with_capacity(self.high)
}
}
pub fn buf_with_capacity(&self, cap: usize) -> BytesMut {
BytesMut::with_capacity(cap)
}
#[inline]
pub fn resize(&self, buf: &mut BytesMut) {
if buf.remaining_mut() < self.low {
self.resize_min(buf, self.high);
}
}
#[inline]
pub fn resize_min(&self, buf: &mut BytesMut, size: usize) {
let mut avail = buf.remaining_mut();
if avail < size {
let mut new_cap = buf.capacity();
while avail < size {
avail += self.high;
new_cap += self.high;
}
buf.reserve_capacity(new_cap);
}
}
#[inline]
pub fn release(&self, buf: BytesMut) {
let cap = buf.capacity();
if cap > self.low && cap <= self.high {
CACHE.with(|c| {
c.with(self.idx, self.first, |v: &mut Vec<_>| {
if v.len() < self.cache_size {
v.push(buf);
}
});
});
}
}
}
struct LocalCache {
cache: UnsafeCell<Vec<(Vec<BytesMut>, Vec<BytesMut>)>>,
}
impl LocalCache {
fn new() -> Self {
Self {
cache: UnsafeCell::new(Vec::with_capacity(16)),
}
}
fn with<F, R>(&self, idx: usize, first: bool, f: F) -> R
where
F: FnOnce(&mut Vec<BytesMut>) -> R,
{
let cache = unsafe { &mut *self.cache.get() };
while cache.len() <= idx {
cache.push((Vec::new(), Vec::new()));
}
if first {
f(&mut cache[idx].0)
} else {
f(&mut cache[idx].1)
}
}
}