ntex_io/
cfg.rs

1use std::cell::UnsafeCell;
2
3use ntex_bytes::{BytesVec, buf::BufMut};
4use ntex_service::cfg::{CfgContext, Configuration};
5use ntex_util::{time::Millis, time::Seconds};
6
7const DEFAULT_CACHE_SIZE: usize = 128;
8thread_local! {
9    static CACHE: LocalCache = LocalCache::new();
10}
11
12#[derive(Clone, Debug)]
13/// Base io configuration
14pub struct IoConfig {
15    connect_timeout: Millis,
16    keepalive_timeout: Seconds,
17    disconnect_timeout: Seconds,
18    frame_read_rate: Option<FrameReadRate>,
19
20    // io read/write cache and params
21    read_buf: BufConfig,
22    write_buf: BufConfig,
23
24    // shared config
25    pub(crate) config: CfgContext,
26}
27
28impl Default for IoConfig {
29    fn default() -> Self {
30        IoConfig::new()
31    }
32}
33
34impl Configuration for IoConfig {
35    const NAME: &str = "IO Configuration";
36
37    fn ctx(&self) -> &CfgContext {
38        &self.config
39    }
40
41    fn set_ctx(&mut self, ctx: CfgContext) {
42        self.config = ctx;
43        self.read_buf.idx = ctx.id();
44        self.write_buf.idx = ctx.id();
45    }
46}
47
48#[derive(Copy, Clone, Debug)]
49pub struct FrameReadRate {
50    pub timeout: Seconds,
51    pub max_timeout: Seconds,
52    pub rate: u32,
53}
54
55#[derive(Copy, Clone, Debug)]
56pub struct BufConfig {
57    pub high: usize,
58    pub low: usize,
59    pub half: usize,
60    idx: usize,
61    first: bool,
62    cache_size: usize,
63}
64
65impl IoConfig {
66    #[inline]
67    #[allow(clippy::new_without_default)]
68    /// Create new config object
69    pub fn new() -> IoConfig {
70        let config = CfgContext::default();
71
72        IoConfig {
73            config,
74            connect_timeout: Millis::ZERO,
75            keepalive_timeout: Seconds(0),
76            disconnect_timeout: Seconds(1),
77            frame_read_rate: None,
78
79            read_buf: BufConfig {
80                high: 16 * 1024,
81                low: 1024,
82                half: 8 * 1024,
83                idx: config.id(),
84                first: true,
85                cache_size: DEFAULT_CACHE_SIZE,
86            },
87            write_buf: BufConfig {
88                high: 16 * 1024,
89                low: 1024,
90                half: 8 * 1024,
91                idx: config.id(),
92                first: false,
93                cache_size: DEFAULT_CACHE_SIZE,
94            },
95        }
96    }
97
98    #[inline]
99    /// Get tag
100    pub fn tag(&self) -> &str {
101        self.config.tag()
102    }
103
104    #[inline]
105    /// Get connect timeout
106    pub fn connect_timeout(&self) -> Millis {
107        self.connect_timeout
108    }
109
110    #[inline]
111    /// Get keep-alive timeout
112    pub fn keepalive_timeout(&self) -> Seconds {
113        self.keepalive_timeout
114    }
115
116    #[inline]
117    /// Get disconnect timeout
118    pub fn disconnect_timeout(&self) -> Seconds {
119        self.disconnect_timeout
120    }
121
122    #[inline]
123    /// Get frame read params
124    pub fn frame_read_rate(&self) -> Option<&FrameReadRate> {
125        self.frame_read_rate.as_ref()
126    }
127
128    #[inline]
129    /// Get read buffer parameters
130    pub fn read_buf(&self) -> &BufConfig {
131        &self.read_buf
132    }
133
134    #[inline]
135    /// Get write buffer parameters
136    pub fn write_buf(&self) -> &BufConfig {
137        &self.write_buf
138    }
139
140    /// Set connect timeout in seconds.
141    ///
142    /// To disable timeout set value to 0.
143    ///
144    /// By default connect timeout is disabled.
145    pub fn set_connect_timeout<T: Into<Millis>>(mut self, timeout: T) -> Self {
146        self.connect_timeout = timeout.into();
147        self
148    }
149
150    /// Set keep-alive timeout in seconds.
151    ///
152    /// To disable timeout set value to 0.
153    ///
154    /// By default keep-alive timeout is disabled.
155    pub fn set_keepalive_timeout<T: Into<Seconds>>(mut self, timeout: T) -> Self {
156        self.keepalive_timeout = timeout.into();
157        self
158    }
159
160    /// Set connection disconnect timeout.
161    ///
162    /// Defines a timeout for disconnect connection. If a disconnect procedure does not complete
163    /// within this time, the connection get dropped.
164    ///
165    /// To disable timeout set value to 0.
166    ///
167    /// By default disconnect timeout is set to 1 seconds.
168    pub fn set_disconnect_timeout<T: Into<Seconds>>(mut self, timeout: T) -> Self {
169        self.disconnect_timeout = timeout.into();
170        self
171    }
172
173    /// Set read rate parameters for single frame.
174    ///
175    /// Set read timeout, max timeout and rate for reading payload. If the client
176    /// sends `rate` amount of data within `timeout` period of time, extend timeout by `timeout` seconds.
177    /// But no more than `max_timeout` timeout.
178    ///
179    /// By default frame read rate is disabled.
180    pub fn set_frame_read_rate(
181        mut self,
182        timeout: Seconds,
183        max_timeout: Seconds,
184        rate: u32,
185    ) -> Self {
186        self.frame_read_rate = Some(FrameReadRate {
187            timeout,
188            max_timeout,
189            rate,
190        });
191        self
192    }
193
194    /// Set read buffer parameters.
195    ///
196    /// By default high watermark is set to 16Kb, low watermark 1kb.
197    pub fn set_read_buf(
198        mut self,
199        high_watermark: usize,
200        low_watermark: usize,
201        cache_size: usize,
202    ) -> Self {
203        self.read_buf.cache_size = cache_size;
204        self.read_buf.high = high_watermark;
205        self.read_buf.low = low_watermark;
206        self.read_buf.half = high_watermark >> 1;
207        self
208    }
209
210    /// Set write buffer parameters.
211    ///
212    /// By default high watermark is set to 16Kb, low watermark 1kb.
213    pub fn set_write_buf(
214        mut self,
215        high_watermark: usize,
216        low_watermark: usize,
217        cache_size: usize,
218    ) -> Self {
219        self.write_buf.cache_size = cache_size;
220        self.write_buf.high = high_watermark;
221        self.write_buf.low = low_watermark;
222        self.write_buf.half = high_watermark >> 1;
223        self
224    }
225}
226
227impl BufConfig {
228    #[inline]
229    /// Get buffer
230    pub fn get(&self) -> BytesVec {
231        if let Some(mut buf) =
232            CACHE.with(|c| c.with(self.idx, self.first, |c: &mut Vec<_>| c.pop()))
233        {
234            buf.clear();
235            buf
236        } else {
237            BytesVec::with_capacity(self.high)
238        }
239    }
240
241    /// Get buffer with capacity
242    pub fn buf_with_capacity(&self, cap: usize) -> BytesVec {
243        BytesVec::with_capacity(cap)
244    }
245
246    #[inline]
247    /// Resize buffer
248    pub fn resize(&self, buf: &mut BytesVec) {
249        let remaining = buf.remaining_mut();
250        if remaining < self.low {
251            buf.reserve(self.high - remaining);
252        }
253    }
254
255    #[inline]
256    /// Release buffer, buf must be allocated from this pool
257    pub fn release(&self, buf: BytesVec) {
258        let cap = buf.capacity();
259        if cap > self.low && cap <= self.high {
260            CACHE.with(|c| {
261                c.with(self.idx, self.first, |v: &mut Vec<_>| {
262                    if v.len() < self.cache_size {
263                        v.push(buf);
264                    }
265                })
266            })
267        }
268    }
269}
270
271struct LocalCache {
272    cache: UnsafeCell<Vec<(Vec<BytesVec>, Vec<BytesVec>)>>,
273}
274
275impl LocalCache {
276    fn new() -> Self {
277        Self {
278            cache: UnsafeCell::new(Vec::with_capacity(16)),
279        }
280    }
281
282    fn with<F, R>(&self, idx: usize, first: bool, f: F) -> R
283    where
284        F: FnOnce(&mut Vec<BytesVec>) -> R,
285    {
286        let cache = unsafe { &mut *self.cache.get() };
287
288        while cache.len() <= idx {
289            cache.push((Vec::new(), Vec::new()))
290        }
291        if first {
292            f(&mut cache[idx].0)
293        } else {
294            f(&mut cache[idx].1)
295        }
296    }
297}