1use std::cell::UnsafeCell;
2use std::sync::atomic::{AtomicUsize, Ordering};
3
4use ntex_bytes::{BytesVec, buf::BufMut};
5use ntex_service::cfg::{CfgContext, Configuration};
6use ntex_util::{time::Millis, time::Seconds};
7
8const DEFAULT_CACHE_SIZE: usize = 128;
9static IDX: AtomicUsize = AtomicUsize::new(0);
10
11thread_local! {
12 static CACHE: LocalCache = LocalCache::new();
13}
14
15#[derive(Clone, Debug)]
16pub struct IoConfig {
18 connect_timeout: Millis,
19 keepalive_timeout: Seconds,
20 disconnect_timeout: Seconds,
21 frame_read_rate: Option<FrameReadRate>,
22
23 read_buf: BufConfig,
25 write_buf: BufConfig,
26
27 pub(crate) config: CfgContext,
29}
30
31impl Configuration for IoConfig {
32 const NAME: &str = "IO Configuration";
33
34 fn default() -> &'static Self {
35 thread_local! {
36 static DEFAULT_CFG: &'static IoConfig =
37 Box::leak(Box::new(IoConfig::new()));
38 }
39 DEFAULT_CFG.with(|cfg| *cfg)
40 }
41
42 fn ctx(&self) -> &CfgContext {
43 &self.config
44 }
45
46 fn set_ctx(&mut self, ctx: CfgContext) {
47 self.config = ctx;
48 }
49}
50
51#[derive(Copy, Clone, Debug)]
52pub struct FrameReadRate {
53 pub timeout: Seconds,
54 pub max_timeout: Seconds,
55 pub rate: u32,
56}
57
58#[derive(Copy, Clone, Debug)]
59pub struct BufConfig {
60 pub high: usize,
61 pub low: usize,
62 pub half: usize,
63 idx: usize,
64 cache_size: usize,
65}
66
67impl IoConfig {
68 #[inline]
69 #[allow(clippy::new_without_default)]
70 pub fn new() -> IoConfig {
72 let idx1 = IDX.fetch_add(1, Ordering::SeqCst);
73 let idx2 = IDX.fetch_add(1, Ordering::SeqCst);
74 IoConfig {
75 connect_timeout: Millis::ZERO,
76 keepalive_timeout: Seconds(0),
77 disconnect_timeout: Seconds(1),
78 frame_read_rate: None,
79
80 read_buf: BufConfig {
81 high: 16 * 1024,
82 low: 1024,
83 half: 8 * 1024,
84 idx: idx1,
85 cache_size: DEFAULT_CACHE_SIZE,
86 },
87 write_buf: BufConfig {
88 high: 16 * 1024,
89 low: 1024,
90 half: 8 * 1024,
91 idx: idx2,
92 cache_size: DEFAULT_CACHE_SIZE,
93 },
94 config: CfgContext::default(),
95 }
96 }
97
98 #[inline]
99 pub fn tag(&self) -> &str {
101 self.config.tag()
102 }
103
104 #[inline]
105 pub fn connect_timeout(&self) -> Millis {
107 self.connect_timeout
108 }
109
110 #[inline]
111 pub fn keepalive_timeout(&self) -> Seconds {
113 self.keepalive_timeout
114 }
115
116 #[inline]
117 pub fn disconnect_timeout(&self) -> Seconds {
119 self.disconnect_timeout
120 }
121
122 #[inline]
123 pub fn frame_read_rate(&self) -> Option<&FrameReadRate> {
125 self.frame_read_rate.as_ref()
126 }
127
128 #[inline]
129 pub fn read_buf(&self) -> &BufConfig {
131 &self.read_buf
132 }
133
134 #[inline]
135 pub fn write_buf(&self) -> &BufConfig {
137 &self.write_buf
138 }
139
140 pub fn set_connect_timeout<T: Into<Millis>>(mut self, timeout: T) -> Self {
146 self.connect_timeout = timeout.into();
147 self
148 }
149
150 pub fn set_keepalive_timeout<T: Into<Seconds>>(mut self, timeout: T) -> Self {
156 self.keepalive_timeout = timeout.into();
157 self
158 }
159
160 pub fn set_disconnect_timeout<T: Into<Seconds>>(mut self, timeout: T) -> Self {
169 self.disconnect_timeout = timeout.into();
170 self
171 }
172
173 pub fn set_frame_read_rate(
181 mut self,
182 timeout: Seconds,
183 max_timeout: Seconds,
184 rate: u32,
185 ) -> Self {
186 self.frame_read_rate = Some(FrameReadRate {
187 timeout,
188 max_timeout,
189 rate,
190 });
191 self
192 }
193
194 pub fn set_read_buf(
198 mut self,
199 high_watermark: usize,
200 low_watermark: usize,
201 cache_size: usize,
202 ) -> Self {
203 self.read_buf.cache_size = cache_size;
204 self.read_buf.high = high_watermark;
205 self.read_buf.low = low_watermark;
206 self.read_buf.half = high_watermark >> 1;
207 self
208 }
209
210 pub fn set_write_buf(
214 mut self,
215 high_watermark: usize,
216 low_watermark: usize,
217 cache_size: usize,
218 ) -> Self {
219 self.write_buf.cache_size = cache_size;
220 self.write_buf.high = high_watermark;
221 self.write_buf.low = low_watermark;
222 self.write_buf.half = high_watermark >> 1;
223 self
224 }
225}
226
227impl BufConfig {
228 #[inline]
229 pub fn get(&self) -> BytesVec {
231 if let Some(mut buf) = CACHE.with(|c| c.with(self.idx, |c: &mut Vec<_>| c.pop())) {
232 buf.clear();
233 buf
234 } else {
235 BytesVec::with_capacity(self.high)
236 }
237 }
238
239 pub fn buf_with_capacity(&self, cap: usize) -> BytesVec {
241 BytesVec::with_capacity(cap)
242 }
243
244 #[inline]
245 pub fn resize(&self, buf: &mut BytesVec) {
247 let remaining = buf.remaining_mut();
248 if remaining < self.low {
249 buf.reserve(self.high - remaining);
250 }
251 }
252
253 #[inline]
254 pub fn release(&self, buf: BytesVec) {
256 let cap = buf.capacity();
257 if cap > self.low && cap <= self.high {
258 CACHE.with(|c| {
259 c.with(self.idx, |v: &mut Vec<_>| {
260 if v.len() < self.cache_size {
261 v.push(buf);
262 }
263 })
264 })
265 }
266 }
267}
268
269struct LocalCache {
270 cache: UnsafeCell<Vec<Vec<BytesVec>>>,
271}
272
273impl LocalCache {
274 fn new() -> Self {
275 Self {
276 cache: UnsafeCell::new(Vec::with_capacity(16)),
277 }
278 }
279
280 fn with<F, R>(&self, idx: usize, f: F) -> R
281 where
282 F: FnOnce(&mut Vec<BytesVec>) -> R,
283 {
284 let cache = unsafe { &mut *self.cache.get() };
285
286 while cache.len() <= idx {
287 cache.push(Vec::new())
288 }
289 f(&mut cache[idx])
290 }
291}