ntex_io/
cfg.rs

1use std::cell::UnsafeCell;
2use std::sync::atomic::{AtomicUsize, Ordering};
3
4use ntex_bytes::{BytesVec, buf::BufMut};
5use ntex_service::cfg::{CfgContext, Configuration};
6use ntex_util::{time::Millis, time::Seconds};
7
8const DEFAULT_CACHE_SIZE: usize = 128;
9static IDX: AtomicUsize = AtomicUsize::new(0);
10
11thread_local! {
12    static CACHE: LocalCache = LocalCache::new();
13}
14
15#[derive(Clone, Debug)]
16/// Base io configuration
17pub struct IoConfig {
18    connect_timeout: Millis,
19    keepalive_timeout: Seconds,
20    disconnect_timeout: Seconds,
21    frame_read_rate: Option<FrameReadRate>,
22
23    // io read/write cache and params
24    read_buf: BufConfig,
25    write_buf: BufConfig,
26
27    // shared config
28    pub(crate) config: CfgContext,
29}
30
31impl Configuration for IoConfig {
32    const NAME: &str = "IO Configuration";
33
34    fn default() -> &'static Self {
35        thread_local! {
36            static DEFAULT_CFG: &'static IoConfig =
37                Box::leak(Box::new(IoConfig::new()));
38        }
39        DEFAULT_CFG.with(|cfg| *cfg)
40    }
41
42    fn ctx(&self) -> &CfgContext {
43        &self.config
44    }
45
46    fn set_ctx(&mut self, ctx: CfgContext) {
47        self.config = ctx;
48    }
49}
50
51#[derive(Copy, Clone, Debug)]
52pub struct FrameReadRate {
53    pub timeout: Seconds,
54    pub max_timeout: Seconds,
55    pub rate: u32,
56}
57
58#[derive(Copy, Clone, Debug)]
59pub struct BufConfig {
60    pub high: usize,
61    pub low: usize,
62    pub half: usize,
63    idx: usize,
64    cache_size: usize,
65}
66
67impl IoConfig {
68    #[inline]
69    #[allow(clippy::new_without_default)]
70    /// Create new config object
71    pub fn new() -> IoConfig {
72        let idx1 = IDX.fetch_add(1, Ordering::SeqCst);
73        let idx2 = IDX.fetch_add(1, Ordering::SeqCst);
74        IoConfig {
75            connect_timeout: Millis::ZERO,
76            keepalive_timeout: Seconds(0),
77            disconnect_timeout: Seconds(1),
78            frame_read_rate: None,
79
80            read_buf: BufConfig {
81                high: 16 * 1024,
82                low: 1024,
83                half: 8 * 1024,
84                idx: idx1,
85                cache_size: DEFAULT_CACHE_SIZE,
86            },
87            write_buf: BufConfig {
88                high: 16 * 1024,
89                low: 1024,
90                half: 8 * 1024,
91                idx: idx2,
92                cache_size: DEFAULT_CACHE_SIZE,
93            },
94            config: CfgContext::default(),
95        }
96    }
97
98    #[inline]
99    /// Get tag
100    pub fn tag(&self) -> &str {
101        self.config.tag()
102    }
103
104    #[inline]
105    /// Get connect timeout
106    pub fn connect_timeout(&self) -> Millis {
107        self.connect_timeout
108    }
109
110    #[inline]
111    /// Get keep-alive timeout
112    pub fn keepalive_timeout(&self) -> Seconds {
113        self.keepalive_timeout
114    }
115
116    #[inline]
117    /// Get disconnect timeout
118    pub fn disconnect_timeout(&self) -> Seconds {
119        self.disconnect_timeout
120    }
121
122    #[inline]
123    /// Get frame read params
124    pub fn frame_read_rate(&self) -> Option<&FrameReadRate> {
125        self.frame_read_rate.as_ref()
126    }
127
128    #[inline]
129    /// Get read buffer parameters
130    pub fn read_buf(&self) -> &BufConfig {
131        &self.read_buf
132    }
133
134    #[inline]
135    /// Get write buffer parameters
136    pub fn write_buf(&self) -> &BufConfig {
137        &self.write_buf
138    }
139
140    /// Set connect timeout in seconds.
141    ///
142    /// To disable timeout set value to 0.
143    ///
144    /// By default connect timeout is disabled.
145    pub fn set_connect_timeout<T: Into<Millis>>(mut self, timeout: T) -> Self {
146        self.connect_timeout = timeout.into();
147        self
148    }
149
150    /// Set keep-alive timeout in seconds.
151    ///
152    /// To disable timeout set value to 0.
153    ///
154    /// By default keep-alive timeout is disabled.
155    pub fn set_keepalive_timeout<T: Into<Seconds>>(mut self, timeout: T) -> Self {
156        self.keepalive_timeout = timeout.into();
157        self
158    }
159
160    /// Set connection disconnect timeout.
161    ///
162    /// Defines a timeout for disconnect connection. If a disconnect procedure does not complete
163    /// within this time, the connection get dropped.
164    ///
165    /// To disable timeout set value to 0.
166    ///
167    /// By default disconnect timeout is set to 1 seconds.
168    pub fn set_disconnect_timeout<T: Into<Seconds>>(mut self, timeout: T) -> Self {
169        self.disconnect_timeout = timeout.into();
170        self
171    }
172
173    /// Set read rate parameters for single frame.
174    ///
175    /// Set read timeout, max timeout and rate for reading payload. If the client
176    /// sends `rate` amount of data within `timeout` period of time, extend timeout by `timeout` seconds.
177    /// But no more than `max_timeout` timeout.
178    ///
179    /// By default frame read rate is disabled.
180    pub fn set_frame_read_rate(
181        mut self,
182        timeout: Seconds,
183        max_timeout: Seconds,
184        rate: u32,
185    ) -> Self {
186        self.frame_read_rate = Some(FrameReadRate {
187            timeout,
188            max_timeout,
189            rate,
190        });
191        self
192    }
193
194    /// Set read buffer parameters.
195    ///
196    /// By default high watermark is set to 16Kb, low watermark 1kb.
197    pub fn set_read_buf(
198        mut self,
199        high_watermark: usize,
200        low_watermark: usize,
201        cache_size: usize,
202    ) -> Self {
203        self.read_buf.cache_size = cache_size;
204        self.read_buf.high = high_watermark;
205        self.read_buf.low = low_watermark;
206        self.read_buf.half = high_watermark >> 1;
207        self
208    }
209
210    /// Set write buffer parameters.
211    ///
212    /// By default high watermark is set to 16Kb, low watermark 1kb.
213    pub fn set_write_buf(
214        mut self,
215        high_watermark: usize,
216        low_watermark: usize,
217        cache_size: usize,
218    ) -> Self {
219        self.write_buf.cache_size = cache_size;
220        self.write_buf.high = high_watermark;
221        self.write_buf.low = low_watermark;
222        self.write_buf.half = high_watermark >> 1;
223        self
224    }
225}
226
227impl BufConfig {
228    #[inline]
229    /// Get buffer
230    pub fn get(&self) -> BytesVec {
231        if let Some(mut buf) = CACHE.with(|c| c.with(self.idx, |c: &mut Vec<_>| c.pop())) {
232            buf.clear();
233            buf
234        } else {
235            BytesVec::with_capacity(self.high)
236        }
237    }
238
239    /// Get buffer with capacity
240    pub fn buf_with_capacity(&self, cap: usize) -> BytesVec {
241        BytesVec::with_capacity(cap)
242    }
243
244    #[inline]
245    /// Resize buffer
246    pub fn resize(&self, buf: &mut BytesVec) {
247        let remaining = buf.remaining_mut();
248        if remaining < self.low {
249            buf.reserve(self.high - remaining);
250        }
251    }
252
253    #[inline]
254    /// Release buffer, buf must be allocated from this pool
255    pub fn release(&self, buf: BytesVec) {
256        let cap = buf.capacity();
257        if cap > self.low && cap <= self.high {
258            CACHE.with(|c| {
259                c.with(self.idx, |v: &mut Vec<_>| {
260                    if v.len() < self.cache_size {
261                        v.push(buf);
262                    }
263                })
264            })
265        }
266    }
267}
268
269struct LocalCache {
270    cache: UnsafeCell<Vec<Vec<BytesVec>>>,
271}
272
273impl LocalCache {
274    fn new() -> Self {
275        Self {
276            cache: UnsafeCell::new(Vec::with_capacity(16)),
277        }
278    }
279
280    fn with<F, R>(&self, idx: usize, f: F) -> R
281    where
282        F: FnOnce(&mut Vec<BytesVec>) -> R,
283    {
284        let cache = unsafe { &mut *self.cache.get() };
285
286        while cache.len() <= idx {
287            cache.push(Vec::new())
288        }
289        f(&mut cache[idx])
290    }
291}