1use std::cell::UnsafeCell;
2
3use ntex_bytes::{BytesVec, buf::BufMut};
4use ntex_service::cfg::{CfgContext, Configuration};
5use ntex_util::{time::Millis, time::Seconds};
6
7const DEFAULT_CACHE_SIZE: usize = 128;
8thread_local! {
9 static CACHE: LocalCache = LocalCache::new();
10}
11
12#[derive(Clone, Debug)]
13pub struct IoConfig {
15 connect_timeout: Millis,
16 keepalive_timeout: Seconds,
17 disconnect_timeout: Seconds,
18 frame_read_rate: Option<FrameReadRate>,
19
20 read_buf: BufConfig,
22 write_buf: BufConfig,
23
24 pub(crate) config: CfgContext,
26}
27
28impl Default for IoConfig {
29 fn default() -> Self {
30 IoConfig::new()
31 }
32}
33
34impl Configuration for IoConfig {
35 const NAME: &str = "IO Configuration";
36
37 fn ctx(&self) -> &CfgContext {
38 &self.config
39 }
40
41 fn set_ctx(&mut self, ctx: CfgContext) {
42 self.config = ctx;
43 self.read_buf.idx = ctx.id();
44 self.write_buf.idx = ctx.id();
45 }
46}
47
48#[derive(Copy, Clone, Debug)]
49pub struct FrameReadRate {
50 pub timeout: Seconds,
51 pub max_timeout: Seconds,
52 pub rate: u32,
53}
54
55#[derive(Copy, Clone, Debug)]
56pub struct BufConfig {
57 pub high: usize,
58 pub low: usize,
59 pub half: usize,
60 idx: usize,
61 first: bool,
62 cache_size: usize,
63}
64
65impl IoConfig {
66 #[inline]
67 #[allow(clippy::new_without_default)]
68 pub fn new() -> IoConfig {
70 let config = CfgContext::default();
71
72 IoConfig {
73 config,
74 connect_timeout: Millis::ZERO,
75 keepalive_timeout: Seconds(0),
76 disconnect_timeout: Seconds(1),
77 frame_read_rate: None,
78
79 read_buf: BufConfig {
80 high: 16 * 1024,
81 low: 1024,
82 half: 8 * 1024,
83 idx: config.id(),
84 first: true,
85 cache_size: DEFAULT_CACHE_SIZE,
86 },
87 write_buf: BufConfig {
88 high: 16 * 1024,
89 low: 1024,
90 half: 8 * 1024,
91 idx: config.id(),
92 first: false,
93 cache_size: DEFAULT_CACHE_SIZE,
94 },
95 }
96 }
97
98 #[inline]
99 pub fn tag(&self) -> &str {
101 self.config.tag()
102 }
103
104 #[inline]
105 pub fn connect_timeout(&self) -> Millis {
107 self.connect_timeout
108 }
109
110 #[inline]
111 pub fn keepalive_timeout(&self) -> Seconds {
113 self.keepalive_timeout
114 }
115
116 #[inline]
117 pub fn disconnect_timeout(&self) -> Seconds {
119 self.disconnect_timeout
120 }
121
122 #[inline]
123 pub fn frame_read_rate(&self) -> Option<&FrameReadRate> {
125 self.frame_read_rate.as_ref()
126 }
127
128 #[inline]
129 pub fn read_buf(&self) -> &BufConfig {
131 &self.read_buf
132 }
133
134 #[inline]
135 pub fn write_buf(&self) -> &BufConfig {
137 &self.write_buf
138 }
139
140 pub fn set_connect_timeout<T: Into<Millis>>(mut self, timeout: T) -> Self {
146 self.connect_timeout = timeout.into();
147 self
148 }
149
150 pub fn set_keepalive_timeout<T: Into<Seconds>>(mut self, timeout: T) -> Self {
156 self.keepalive_timeout = timeout.into();
157 self
158 }
159
160 pub fn set_disconnect_timeout<T: Into<Seconds>>(mut self, timeout: T) -> Self {
169 self.disconnect_timeout = timeout.into();
170 self
171 }
172
173 pub fn set_frame_read_rate(
181 mut self,
182 timeout: Seconds,
183 max_timeout: Seconds,
184 rate: u32,
185 ) -> Self {
186 self.frame_read_rate = Some(FrameReadRate {
187 timeout,
188 max_timeout,
189 rate,
190 });
191 self
192 }
193
194 pub fn set_read_buf(
198 mut self,
199 high_watermark: usize,
200 low_watermark: usize,
201 cache_size: usize,
202 ) -> Self {
203 self.read_buf.cache_size = cache_size;
204 self.read_buf.high = high_watermark;
205 self.read_buf.low = low_watermark;
206 self.read_buf.half = high_watermark >> 1;
207 self
208 }
209
210 pub fn set_write_buf(
214 mut self,
215 high_watermark: usize,
216 low_watermark: usize,
217 cache_size: usize,
218 ) -> Self {
219 self.write_buf.cache_size = cache_size;
220 self.write_buf.high = high_watermark;
221 self.write_buf.low = low_watermark;
222 self.write_buf.half = high_watermark >> 1;
223 self
224 }
225}
226
227impl BufConfig {
228 #[inline]
229 pub fn get(&self) -> BytesVec {
231 if let Some(mut buf) =
232 CACHE.with(|c| c.with(self.idx, self.first, |c: &mut Vec<_>| c.pop()))
233 {
234 buf.clear();
235 buf
236 } else {
237 BytesVec::with_capacity(self.high)
238 }
239 }
240
241 pub fn buf_with_capacity(&self, cap: usize) -> BytesVec {
243 BytesVec::with_capacity(cap)
244 }
245
246 #[inline]
247 pub fn resize(&self, buf: &mut BytesVec) {
249 let remaining = buf.remaining_mut();
250 if remaining < self.low {
251 buf.reserve(self.high - remaining);
252 }
253 }
254
255 #[inline]
256 pub fn release(&self, buf: BytesVec) {
258 let cap = buf.capacity();
259 if cap > self.low && cap <= self.high {
260 CACHE.with(|c| {
261 c.with(self.idx, self.first, |v: &mut Vec<_>| {
262 if v.len() < self.cache_size {
263 v.push(buf);
264 }
265 })
266 })
267 }
268 }
269}
270
271struct LocalCache {
272 cache: UnsafeCell<Vec<(Vec<BytesVec>, Vec<BytesVec>)>>,
273}
274
275impl LocalCache {
276 fn new() -> Self {
277 Self {
278 cache: UnsafeCell::new(Vec::with_capacity(16)),
279 }
280 }
281
282 fn with<F, R>(&self, idx: usize, first: bool, f: F) -> R
283 where
284 F: FnOnce(&mut Vec<BytesVec>) -> R,
285 {
286 let cache = unsafe { &mut *self.cache.get() };
287
288 while cache.len() <= idx {
289 cache.push((Vec::new(), Vec::new()))
290 }
291 if first {
292 f(&mut cache[idx].0)
293 } else {
294 f(&mut cache[idx].1)
295 }
296 }
297}