1use std::cell::UnsafeCell;
2
3use ntex_bytes::{BytePageSize, BytesMut, buf::BufMut};
4use ntex_service::cfg::{CfgContext, Configuration};
5use ntex_util::{time::Millis, time::Seconds};
6
7const DEFAULT_CACHE_SIZE: usize = 128;
8const DEFAULT_HIGH: usize = 16 * 1024 - 24;
9const DEFAULT_LOW: usize = 512 + 24;
10const DEFAULT_HALF: usize = (16 * 1024 - 24) / 2;
11
12thread_local! {
13 static CACHE: LocalCache = LocalCache::new();
14}
15
16#[derive(Debug)]
17pub struct IoConfig {
19 connect_timeout: Millis,
20 keepalive_timeout: Seconds,
21 disconnect_timeout: Seconds,
22 frame_read_rate: Option<FrameReadRate>,
23
24 read_buf: BufConfig,
26 write_buf: BufConfig,
27 write_page_size: BytePageSize,
28 write_buf_threshold: usize,
29
30 pub(crate) config: CfgContext,
32}
33
34impl Default for IoConfig {
35 fn default() -> Self {
36 IoConfig::new()
37 }
38}
39
40impl Configuration for IoConfig {
41 const NAME: &str = "IO Configuration";
42
43 fn ctx(&self) -> &CfgContext {
44 &self.config
45 }
46
47 fn set_ctx(&mut self, ctx: CfgContext) {
48 self.read_buf.idx = ctx.id();
49 self.write_buf.idx = ctx.id();
50 self.config = ctx;
51 }
52}
53
54#[derive(Copy, Clone, Debug)]
55pub struct FrameReadRate {
56 pub timeout: Seconds,
57 pub max_timeout: Seconds,
58 pub rate: u32,
59}
60
61#[derive(Copy, Clone, Debug)]
62pub struct BufConfig {
63 pub high: usize,
64 pub low: usize,
65 pub half: usize,
66 idx: usize,
67 first: bool,
68 cache_size: usize,
69}
70
71impl IoConfig {
72 #[inline]
73 #[must_use]
74 pub fn new() -> IoConfig {
76 let config = CfgContext::default();
77 let idx = config.id();
78
79 IoConfig {
80 config,
81 connect_timeout: Millis::ZERO,
82 keepalive_timeout: Seconds(0),
83 disconnect_timeout: Seconds(1),
84 frame_read_rate: None,
85
86 read_buf: BufConfig {
87 idx,
88 high: DEFAULT_HIGH,
89 low: DEFAULT_LOW,
90 half: DEFAULT_HALF,
91 first: true,
92 cache_size: DEFAULT_CACHE_SIZE,
93 },
94 write_buf: BufConfig {
95 idx,
96 high: DEFAULT_HIGH,
97 low: DEFAULT_LOW,
98 half: DEFAULT_HALF,
99 first: false,
100 cache_size: DEFAULT_CACHE_SIZE,
101 },
102 write_page_size: BytePageSize::Size16,
103 write_buf_threshold: BytePageSize::Size16.half_capacity(),
104 }
105 }
106
107 #[inline]
108 pub fn tag(&self) -> &str {
110 self.config.tag()
111 }
112
113 #[inline]
114 pub fn connect_timeout(&self) -> Millis {
116 self.connect_timeout
117 }
118
119 #[inline]
120 pub fn keepalive_timeout(&self) -> Seconds {
122 self.keepalive_timeout
123 }
124
125 #[inline]
126 pub fn disconnect_timeout(&self) -> Seconds {
128 self.disconnect_timeout
129 }
130
131 #[inline]
132 pub fn frame_read_rate(&self) -> Option<&FrameReadRate> {
134 self.frame_read_rate.as_ref()
135 }
136
137 #[inline]
138 pub fn read_buf(&self) -> &BufConfig {
140 &self.read_buf
141 }
142
143 #[inline]
144 pub fn write_buf(&self) -> &BufConfig {
146 &self.write_buf
147 }
148
149 #[inline]
150 pub fn write_page_size(&self) -> BytePageSize {
152 self.write_page_size
153 }
154
155 #[inline]
156 pub fn write_buf_threshold(&self) -> usize {
158 self.write_buf_threshold
159 }
160
161 #[must_use]
167 pub fn set_connect_timeout<T: Into<Millis>>(mut self, timeout: T) -> Self {
168 self.connect_timeout = timeout.into();
169 self
170 }
171
172 #[must_use]
178 pub fn set_keepalive_timeout<T: Into<Seconds>>(mut self, timeout: T) -> Self {
179 self.keepalive_timeout = timeout.into();
180 self
181 }
182
183 #[must_use]
192 pub fn set_disconnect_timeout<T: Into<Seconds>>(mut self, timeout: T) -> Self {
193 self.disconnect_timeout = timeout.into();
194 self
195 }
196
197 #[must_use]
205 pub fn set_frame_read_rate(
206 mut self,
207 timeout: Seconds,
208 max_timeout: Seconds,
209 rate: u32,
210 ) -> Self {
211 self.frame_read_rate = Some(FrameReadRate {
212 timeout,
213 max_timeout,
214 rate,
215 });
216 self
217 }
218
219 #[must_use]
223 pub fn set_read_buf(
224 mut self,
225 high_watermark: usize,
226 low_watermark: usize,
227 cache_size: usize,
228 ) -> Self {
229 self.read_buf.cache_size = cache_size;
230 self.read_buf.high = high_watermark;
231 self.read_buf.low = low_watermark;
232 self.read_buf.half = high_watermark >> 1;
233 self
234 }
235
236 #[must_use]
240 pub fn set_write_page_size(mut self, size: BytePageSize) -> Self {
241 self.write_page_size = size;
242 self
243 }
244
245 #[must_use]
260 pub fn set_write_buf_threshold(mut self, size: usize) -> Self {
261 self.write_buf_threshold = size;
262 self
263 }
264
265 #[must_use]
269 pub fn set_write_buf(
270 mut self,
271 high_watermark: usize,
272 low_watermark: usize,
273 cache_size: usize,
274 ) -> Self {
275 self.write_buf.cache_size = cache_size;
276 self.write_buf.high = high_watermark;
277 self.write_buf.low = low_watermark;
278 self.write_buf.half = high_watermark >> 1;
279 self
280 }
281}
282
283impl BufConfig {
284 #[inline]
285 pub fn get(&self) -> BytesMut {
287 if let Some(buf) =
288 CACHE.with(|c| c.with(self.idx, self.first, |c: &mut Vec<_>| c.pop()))
289 {
290 buf
291 } else {
292 BytesMut::with_capacity(self.high)
293 }
294 }
295
296 pub fn buf_with_capacity(&self, cap: usize) -> BytesMut {
298 BytesMut::with_capacity(cap)
299 }
300
301 #[inline]
302 pub fn resize(&self, buf: &mut BytesMut) {
304 if buf.remaining_mut() < self.low {
305 self.resize_min(buf, self.high);
306 }
307 }
308
309 #[inline]
310 pub fn resize_min(&self, buf: &mut BytesMut, size: usize) {
312 let mut avail = buf.remaining_mut();
313 if avail < size {
314 let mut new_cap = buf.capacity();
315 while avail < size {
316 avail += self.high;
317 new_cap += self.high;
318 }
319 buf.reserve_capacity(new_cap);
320 }
321 }
322
323 #[inline]
324 pub fn release(&self, mut buf: BytesMut) {
326 let cap = buf.capacity();
327 if cap > self.low && cap <= self.high {
328 CACHE.with(|c| {
329 c.with(self.idx, self.first, |v: &mut Vec<_>| {
330 if v.len() < self.cache_size {
331 buf.clear();
332 v.push(buf);
333 }
334 });
335 });
336 }
337 }
338}
339
340struct LocalCache {
341 cache: UnsafeCell<Vec<(Vec<BytesMut>, Vec<BytesMut>)>>,
342}
343
344impl LocalCache {
345 fn new() -> Self {
346 Self {
347 cache: UnsafeCell::new(Vec::with_capacity(16)),
348 }
349 }
350
351 fn with<F, R>(&self, idx: usize, first: bool, f: F) -> R
352 where
353 F: FnOnce(&mut Vec<BytesMut>) -> R,
354 {
355 let cache = unsafe { &mut *self.cache.get() };
356
357 while cache.len() <= idx {
358 cache.push((Vec::new(), Vec::new()));
359 }
360 if first {
361 f(&mut cache[idx].0)
362 } else {
363 f(&mut cache[idx].1)
364 }
365 }
366}