1use std::cell::UnsafeCell;
2
3use ntex_bytes::{BytesMut, buf::BufMut};
4use ntex_service::cfg::{CfgContext, Configuration};
5use ntex_util::{time::Millis, time::Seconds};
6
7const DEFAULT_CACHE_SIZE: usize = 128;
8const DEFAULT_HIGH: usize = 16 * 1024 - 24;
9const DEFAULT_LOW: usize = 512 + 24;
10const DEFAULT_HALF: usize = (16 * 1024 - 24) / 2;
11
12thread_local! {
13 static CACHE: LocalCache = LocalCache::new();
14}
15
16#[derive(Debug)]
17pub struct IoConfig {
19 connect_timeout: Millis,
20 keepalive_timeout: Seconds,
21 disconnect_timeout: Seconds,
22 frame_read_rate: Option<FrameReadRate>,
23
24 read_buf: BufConfig,
26 write_buf: BufConfig,
27
28 pub(crate) config: CfgContext,
30}
31
32impl Default for IoConfig {
33 fn default() -> Self {
34 IoConfig::new()
35 }
36}
37
38impl Configuration for IoConfig {
39 const NAME: &str = "IO Configuration";
40
41 fn ctx(&self) -> &CfgContext {
42 &self.config
43 }
44
45 fn set_ctx(&mut self, ctx: CfgContext) {
46 self.read_buf.idx = ctx.id();
47 self.write_buf.idx = ctx.id();
48 self.config = ctx;
49 }
50}
51
52#[derive(Copy, Clone, Debug)]
53pub struct FrameReadRate {
54 pub timeout: Seconds,
55 pub max_timeout: Seconds,
56 pub rate: u32,
57}
58
59#[derive(Copy, Clone, Debug)]
60pub struct BufConfig {
61 pub high: usize,
62 pub low: usize,
63 pub half: usize,
64 idx: usize,
65 first: bool,
66 cache_size: usize,
67}
68
69impl IoConfig {
70 #[inline]
71 #[must_use]
72 pub fn new() -> IoConfig {
74 let config = CfgContext::default();
75 let idx = config.id();
76
77 IoConfig {
78 config,
79 connect_timeout: Millis::ZERO,
80 keepalive_timeout: Seconds(0),
81 disconnect_timeout: Seconds(1),
82 frame_read_rate: None,
83
84 read_buf: BufConfig {
85 idx,
86 high: DEFAULT_HIGH,
87 low: DEFAULT_LOW,
88 half: DEFAULT_HALF,
89 first: true,
90 cache_size: DEFAULT_CACHE_SIZE,
91 },
92 write_buf: BufConfig {
93 idx,
94 high: DEFAULT_HIGH,
95 low: DEFAULT_LOW,
96 half: DEFAULT_HALF,
97 first: false,
98 cache_size: DEFAULT_CACHE_SIZE,
99 },
100 }
101 }
102
103 #[inline]
104 pub fn tag(&self) -> &str {
106 self.config.tag()
107 }
108
109 #[inline]
110 pub fn connect_timeout(&self) -> Millis {
112 self.connect_timeout
113 }
114
115 #[inline]
116 pub fn keepalive_timeout(&self) -> Seconds {
118 self.keepalive_timeout
119 }
120
121 #[inline]
122 pub fn disconnect_timeout(&self) -> Seconds {
124 self.disconnect_timeout
125 }
126
127 #[inline]
128 pub fn frame_read_rate(&self) -> Option<&FrameReadRate> {
130 self.frame_read_rate.as_ref()
131 }
132
133 #[inline]
134 pub fn read_buf(&self) -> &BufConfig {
136 &self.read_buf
137 }
138
139 #[inline]
140 pub fn write_buf(&self) -> &BufConfig {
142 &self.write_buf
143 }
144
145 #[must_use]
151 pub fn set_connect_timeout<T: Into<Millis>>(mut self, timeout: T) -> Self {
152 self.connect_timeout = timeout.into();
153 self
154 }
155
156 #[must_use]
162 pub fn set_keepalive_timeout<T: Into<Seconds>>(mut self, timeout: T) -> Self {
163 self.keepalive_timeout = timeout.into();
164 self
165 }
166
167 #[must_use]
176 pub fn set_disconnect_timeout<T: Into<Seconds>>(mut self, timeout: T) -> Self {
177 self.disconnect_timeout = timeout.into();
178 self
179 }
180
181 #[must_use]
189 pub fn set_frame_read_rate(
190 mut self,
191 timeout: Seconds,
192 max_timeout: Seconds,
193 rate: u32,
194 ) -> Self {
195 self.frame_read_rate = Some(FrameReadRate {
196 timeout,
197 max_timeout,
198 rate,
199 });
200 self
201 }
202
203 #[must_use]
207 pub fn set_read_buf(
208 mut self,
209 high_watermark: usize,
210 low_watermark: usize,
211 cache_size: usize,
212 ) -> Self {
213 self.read_buf.cache_size = cache_size;
214 self.read_buf.high = high_watermark;
215 self.read_buf.low = low_watermark;
216 self.read_buf.half = high_watermark >> 1;
217 self
218 }
219
220 #[must_use]
224 pub fn set_write_buf(
225 mut self,
226 high_watermark: usize,
227 low_watermark: usize,
228 cache_size: usize,
229 ) -> Self {
230 self.write_buf.cache_size = cache_size;
231 self.write_buf.high = high_watermark;
232 self.write_buf.low = low_watermark;
233 self.write_buf.half = high_watermark >> 1;
234 self
235 }
236}
237
238impl BufConfig {
239 #[inline]
240 pub fn get(&self) -> BytesMut {
242 if let Some(mut buf) =
243 CACHE.with(|c| c.with(self.idx, self.first, |c: &mut Vec<_>| c.pop()))
244 {
245 buf.clear();
246 buf
247 } else {
248 BytesMut::with_capacity(self.high)
249 }
250 }
251
252 pub fn buf_with_capacity(&self, cap: usize) -> BytesMut {
254 BytesMut::with_capacity(cap)
255 }
256
257 #[inline]
258 pub fn resize(&self, buf: &mut BytesMut) {
260 if buf.remaining_mut() < self.low {
261 self.resize_min(buf, self.high);
262 }
263 }
264
265 #[inline]
266 pub fn resize_min(&self, buf: &mut BytesMut, size: usize) {
268 let mut avail = buf.remaining_mut();
269 if avail < size {
270 let mut new_cap = buf.capacity();
271 while avail < size {
272 avail += self.high;
273 new_cap += self.high;
274 }
275 buf.reserve_capacity(new_cap);
276 }
277 }
278
279 #[inline]
280 pub fn release(&self, buf: BytesMut) {
282 let cap = buf.capacity();
283 if cap > self.low && cap <= self.high {
284 CACHE.with(|c| {
285 c.with(self.idx, self.first, |v: &mut Vec<_>| {
286 if v.len() < self.cache_size {
287 v.push(buf);
288 }
289 });
290 });
291 }
292 }
293}
294
295struct LocalCache {
296 cache: UnsafeCell<Vec<(Vec<BytesMut>, Vec<BytesMut>)>>,
297}
298
299impl LocalCache {
300 fn new() -> Self {
301 Self {
302 cache: UnsafeCell::new(Vec::with_capacity(16)),
303 }
304 }
305
306 fn with<F, R>(&self, idx: usize, first: bool, f: F) -> R
307 where
308 F: FnOnce(&mut Vec<BytesMut>) -> R,
309 {
310 let cache = unsafe { &mut *self.cache.get() };
311
312 while cache.len() <= idx {
313 cache.push((Vec::new(), Vec::new()));
314 }
315 if first {
316 f(&mut cache[idx].0)
317 } else {
318 f(&mut cache[idx].1)
319 }
320 }
321}