Skip to main content

ntex_io/
cfg.rs

1use std::cell::UnsafeCell;
2
3use ntex_bytes::{BytesMut, buf::BufMut};
4use ntex_service::cfg::{CfgContext, Configuration};
5use ntex_util::{time::Millis, time::Seconds};
6
7const DEFAULT_CACHE_SIZE: usize = 128;
8const DEFAULT_HIGH: usize = 16 * 1024 - 24;
9const DEFAULT_LOW: usize = 512 + 24;
10const DEFAULT_HALF: usize = (16 * 1024 - 24) / 2;
11
12thread_local! {
13    static CACHE: LocalCache = LocalCache::new();
14}
15
16#[derive(Debug)]
17/// Base io configuration
18pub struct IoConfig {
19    connect_timeout: Millis,
20    keepalive_timeout: Seconds,
21    disconnect_timeout: Seconds,
22    frame_read_rate: Option<FrameReadRate>,
23
24    // io read/write cache and params
25    read_buf: BufConfig,
26    write_buf: BufConfig,
27
28    // shared config
29    pub(crate) config: CfgContext,
30}
31
32impl Default for IoConfig {
33    fn default() -> Self {
34        IoConfig::new()
35    }
36}
37
38impl Configuration for IoConfig {
39    const NAME: &str = "IO Configuration";
40
41    fn ctx(&self) -> &CfgContext {
42        &self.config
43    }
44
45    fn set_ctx(&mut self, ctx: CfgContext) {
46        self.read_buf.idx = ctx.id();
47        self.write_buf.idx = ctx.id();
48        self.config = ctx;
49    }
50}
51
52#[derive(Copy, Clone, Debug)]
53pub struct FrameReadRate {
54    pub timeout: Seconds,
55    pub max_timeout: Seconds,
56    pub rate: u32,
57}
58
59#[derive(Copy, Clone, Debug)]
60pub struct BufConfig {
61    pub high: usize,
62    pub low: usize,
63    pub half: usize,
64    idx: usize,
65    first: bool,
66    cache_size: usize,
67}
68
69impl IoConfig {
70    #[inline]
71    #[must_use]
72    /// Create new config object
73    pub fn new() -> IoConfig {
74        let config = CfgContext::default();
75        let idx = config.id();
76
77        IoConfig {
78            config,
79            connect_timeout: Millis::ZERO,
80            keepalive_timeout: Seconds(0),
81            disconnect_timeout: Seconds(1),
82            frame_read_rate: None,
83
84            read_buf: BufConfig {
85                idx,
86                high: DEFAULT_HIGH,
87                low: DEFAULT_LOW,
88                half: DEFAULT_HALF,
89                first: true,
90                cache_size: DEFAULT_CACHE_SIZE,
91            },
92            write_buf: BufConfig {
93                idx,
94                high: DEFAULT_HIGH,
95                low: DEFAULT_LOW,
96                half: DEFAULT_HALF,
97                first: false,
98                cache_size: DEFAULT_CACHE_SIZE,
99            },
100        }
101    }
102
103    #[inline]
104    /// Get tag
105    pub fn tag(&self) -> &str {
106        self.config.tag()
107    }
108
109    #[inline]
110    /// Get connect timeout
111    pub fn connect_timeout(&self) -> Millis {
112        self.connect_timeout
113    }
114
115    #[inline]
116    /// Get keep-alive timeout
117    pub fn keepalive_timeout(&self) -> Seconds {
118        self.keepalive_timeout
119    }
120
121    #[inline]
122    /// Get disconnect timeout
123    pub fn disconnect_timeout(&self) -> Seconds {
124        self.disconnect_timeout
125    }
126
127    #[inline]
128    /// Get frame read params
129    pub fn frame_read_rate(&self) -> Option<&FrameReadRate> {
130        self.frame_read_rate.as_ref()
131    }
132
133    #[inline]
134    /// Get read buffer parameters
135    pub fn read_buf(&self) -> &BufConfig {
136        &self.read_buf
137    }
138
139    #[inline]
140    /// Get write buffer parameters
141    pub fn write_buf(&self) -> &BufConfig {
142        &self.write_buf
143    }
144
145    /// Set connect timeout in seconds.
146    ///
147    /// To disable timeout set value to 0.
148    ///
149    /// By default connect timeout is disabled.
150    #[must_use]
151    pub fn set_connect_timeout<T: Into<Millis>>(mut self, timeout: T) -> Self {
152        self.connect_timeout = timeout.into();
153        self
154    }
155
156    /// Set keep-alive timeout in seconds.
157    ///
158    /// To disable timeout set value to 0.
159    ///
160    /// By default keep-alive timeout is disabled.
161    #[must_use]
162    pub fn set_keepalive_timeout<T: Into<Seconds>>(mut self, timeout: T) -> Self {
163        self.keepalive_timeout = timeout.into();
164        self
165    }
166
167    /// Set connection disconnect timeout.
168    ///
169    /// Defines a timeout for disconnect connection. If a disconnect procedure does not complete
170    /// within this time, the connection get dropped.
171    ///
172    /// To disable timeout set value to 0.
173    ///
174    /// By default disconnect timeout is set to 1 seconds.
175    #[must_use]
176    pub fn set_disconnect_timeout<T: Into<Seconds>>(mut self, timeout: T) -> Self {
177        self.disconnect_timeout = timeout.into();
178        self
179    }
180
181    /// Set read rate parameters for single frame.
182    ///
183    /// Set read timeout, max timeout and rate for reading payload. If the client
184    /// sends `rate` amount of data within `timeout` period of time, extend timeout by `timeout` seconds.
185    /// But no more than `max_timeout` timeout.
186    ///
187    /// By default frame read rate is disabled.
188    #[must_use]
189    pub fn set_frame_read_rate(
190        mut self,
191        timeout: Seconds,
192        max_timeout: Seconds,
193        rate: u32,
194    ) -> Self {
195        self.frame_read_rate = Some(FrameReadRate {
196            timeout,
197            max_timeout,
198            rate,
199        });
200        self
201    }
202
203    /// Set read buffer parameters.
204    ///
205    /// By default high watermark is set to 16Kb, low watermark 1kb.
206    #[must_use]
207    pub fn set_read_buf(
208        mut self,
209        high_watermark: usize,
210        low_watermark: usize,
211        cache_size: usize,
212    ) -> Self {
213        self.read_buf.cache_size = cache_size;
214        self.read_buf.high = high_watermark;
215        self.read_buf.low = low_watermark;
216        self.read_buf.half = high_watermark >> 1;
217        self
218    }
219
220    /// Set write buffer parameters.
221    ///
222    /// By default high watermark is set to 16Kb, low watermark 1kb.
223    #[must_use]
224    pub fn set_write_buf(
225        mut self,
226        high_watermark: usize,
227        low_watermark: usize,
228        cache_size: usize,
229    ) -> Self {
230        self.write_buf.cache_size = cache_size;
231        self.write_buf.high = high_watermark;
232        self.write_buf.low = low_watermark;
233        self.write_buf.half = high_watermark >> 1;
234        self
235    }
236}
237
238impl BufConfig {
239    #[inline]
240    /// Get buffer
241    pub fn get(&self) -> BytesMut {
242        if let Some(mut buf) =
243            CACHE.with(|c| c.with(self.idx, self.first, |c: &mut Vec<_>| c.pop()))
244        {
245            buf.clear();
246            buf
247        } else {
248            BytesMut::with_capacity(self.high)
249        }
250    }
251
252    /// Get buffer with capacity
253    pub fn buf_with_capacity(&self, cap: usize) -> BytesMut {
254        BytesMut::with_capacity(cap)
255    }
256
257    #[inline]
258    /// Resize buffer
259    pub fn resize(&self, buf: &mut BytesMut) {
260        if buf.remaining_mut() < self.low {
261            self.resize_min(buf, self.high);
262        }
263    }
264
265    #[inline]
266    /// Resize buffer
267    pub fn resize_min(&self, buf: &mut BytesMut, size: usize) {
268        let mut avail = buf.remaining_mut();
269        if avail < size {
270            let mut new_cap = buf.capacity();
271            while avail < size {
272                avail += self.high;
273                new_cap += self.high;
274            }
275            buf.reserve_capacity(new_cap);
276        }
277    }
278
279    #[inline]
280    /// Release buffer, buf must be allocated from this pool
281    pub fn release(&self, buf: BytesMut) {
282        let cap = buf.capacity();
283        if cap > self.low && cap <= self.high {
284            CACHE.with(|c| {
285                c.with(self.idx, self.first, |v: &mut Vec<_>| {
286                    if v.len() < self.cache_size {
287                        v.push(buf);
288                    }
289                });
290            });
291        }
292    }
293}
294
295struct LocalCache {
296    cache: UnsafeCell<Vec<(Vec<BytesMut>, Vec<BytesMut>)>>,
297}
298
299impl LocalCache {
300    fn new() -> Self {
301        Self {
302            cache: UnsafeCell::new(Vec::with_capacity(16)),
303        }
304    }
305
306    fn with<F, R>(&self, idx: usize, first: bool, f: F) -> R
307    where
308        F: FnOnce(&mut Vec<BytesMut>) -> R,
309    {
310        let cache = unsafe { &mut *self.cache.get() };
311
312        while cache.len() <= idx {
313            cache.push((Vec::new(), Vec::new()));
314        }
315        if first {
316            f(&mut cache[idx].0)
317        } else {
318            f(&mut cache[idx].1)
319        }
320    }
321}