Skip to main content

ntex_io/
cfg.rs

1use std::cell::UnsafeCell;
2
3use ntex_bytes::{BytePageSize, BytesMut, buf::BufMut};
4use ntex_service::cfg::{CfgContext, Configuration};
5use ntex_util::{time::Millis, time::Seconds};
6
7const DEFAULT_CACHE_SIZE: usize = 128;
8const DEFAULT_HIGH: usize = 16 * 1024 - 24;
9const DEFAULT_LOW: usize = 512 + 24;
10const DEFAULT_HALF: usize = (16 * 1024 - 24) / 2;
11
12thread_local! {
13    static CACHE: LocalCache = LocalCache::new();
14}
15
16#[derive(Debug)]
17/// Base io configuration
18pub struct IoConfig {
19    connect_timeout: Millis,
20    keepalive_timeout: Seconds,
21    disconnect_timeout: Seconds,
22    frame_read_rate: Option<FrameReadRate>,
23
24    // io read/write cache and params
25    read_buf: BufConfig,
26    write_buf: BufConfig,
27    write_page_size: BytePageSize,
28    write_buf_threshold: usize,
29
30    // shared config
31    pub(crate) config: CfgContext,
32}
33
34impl Default for IoConfig {
35    fn default() -> Self {
36        IoConfig::new()
37    }
38}
39
40impl Configuration for IoConfig {
41    const NAME: &str = "IO Configuration";
42
43    fn ctx(&self) -> &CfgContext {
44        &self.config
45    }
46
47    fn set_ctx(&mut self, ctx: CfgContext) {
48        self.read_buf.idx = ctx.id();
49        self.write_buf.idx = ctx.id();
50        self.config = ctx;
51    }
52}
53
54#[derive(Copy, Clone, Debug)]
55pub struct FrameReadRate {
56    pub timeout: Seconds,
57    pub max_timeout: Seconds,
58    pub rate: u32,
59}
60
61#[derive(Copy, Clone, Debug)]
62pub struct BufConfig {
63    pub high: usize,
64    pub low: usize,
65    pub half: usize,
66    idx: usize,
67    first: bool,
68    cache_size: usize,
69}
70
71impl IoConfig {
72    #[inline]
73    #[must_use]
74    /// Create new config object
75    pub fn new() -> IoConfig {
76        let config = CfgContext::default();
77        let idx = config.id();
78
79        IoConfig {
80            config,
81            connect_timeout: Millis::ZERO,
82            keepalive_timeout: Seconds(0),
83            disconnect_timeout: Seconds(1),
84            frame_read_rate: None,
85
86            read_buf: BufConfig {
87                idx,
88                high: DEFAULT_HIGH,
89                low: DEFAULT_LOW,
90                half: DEFAULT_HALF,
91                first: true,
92                cache_size: DEFAULT_CACHE_SIZE,
93            },
94            write_buf: BufConfig {
95                idx,
96                high: DEFAULT_HIGH,
97                low: DEFAULT_LOW,
98                half: DEFAULT_HALF,
99                first: false,
100                cache_size: DEFAULT_CACHE_SIZE,
101            },
102            write_page_size: BytePageSize::Size16,
103            write_buf_threshold: BytePageSize::Size16.half_capacity(),
104        }
105    }
106
107    #[inline]
108    /// Get tag
109    pub fn tag(&self) -> &str {
110        self.config.tag()
111    }
112
113    #[inline]
114    /// Get connect timeout
115    pub fn connect_timeout(&self) -> Millis {
116        self.connect_timeout
117    }
118
119    #[inline]
120    /// Get keep-alive timeout
121    pub fn keepalive_timeout(&self) -> Seconds {
122        self.keepalive_timeout
123    }
124
125    #[inline]
126    /// Get disconnect timeout
127    pub fn disconnect_timeout(&self) -> Seconds {
128        self.disconnect_timeout
129    }
130
131    #[inline]
132    /// Get frame read params
133    pub fn frame_read_rate(&self) -> Option<&FrameReadRate> {
134        self.frame_read_rate.as_ref()
135    }
136
137    #[inline]
138    /// Get read buffer parameters
139    pub fn read_buf(&self) -> &BufConfig {
140        &self.read_buf
141    }
142
143    #[inline]
144    /// Get write buffer parameters
145    pub fn write_buf(&self) -> &BufConfig {
146        &self.write_buf
147    }
148
149    #[inline]
150    /// Get write page size
151    pub fn write_page_size(&self) -> BytePageSize {
152        self.write_page_size
153    }
154
155    #[inline]
156    /// The write buffer threshold that triggers earlier sending.
157    pub fn write_buf_threshold(&self) -> usize {
158        self.write_buf_threshold
159    }
160
161    /// Set connect timeout in seconds.
162    ///
163    /// To disable timeout set value to 0.
164    ///
165    /// By default connect timeout is disabled.
166    #[must_use]
167    pub fn set_connect_timeout<T: Into<Millis>>(mut self, timeout: T) -> Self {
168        self.connect_timeout = timeout.into();
169        self
170    }
171
172    /// Set keep-alive timeout in seconds.
173    ///
174    /// To disable timeout set value to 0.
175    ///
176    /// By default keep-alive timeout is disabled.
177    #[must_use]
178    pub fn set_keepalive_timeout<T: Into<Seconds>>(mut self, timeout: T) -> Self {
179        self.keepalive_timeout = timeout.into();
180        self
181    }
182
183    /// Set connection disconnect timeout.
184    ///
185    /// Defines a timeout for disconnect connection. If a disconnect procedure does not complete
186    /// within this time, the connection get dropped.
187    ///
188    /// To disable timeout set value to 0.
189    ///
190    /// By default disconnect timeout is set to 1 seconds.
191    #[must_use]
192    pub fn set_disconnect_timeout<T: Into<Seconds>>(mut self, timeout: T) -> Self {
193        self.disconnect_timeout = timeout.into();
194        self
195    }
196
197    /// Set read rate parameters for single frame.
198    ///
199    /// Set read timeout, max timeout and rate for reading payload. If the client
200    /// sends `rate` amount of data within `timeout` period of time, extend timeout by `timeout` seconds.
201    /// But no more than `max_timeout` timeout.
202    ///
203    /// By default frame read rate is disabled.
204    #[must_use]
205    pub fn set_frame_read_rate(
206        mut self,
207        timeout: Seconds,
208        max_timeout: Seconds,
209        rate: u32,
210    ) -> Self {
211        self.frame_read_rate = Some(FrameReadRate {
212            timeout,
213            max_timeout,
214            rate,
215        });
216        self
217    }
218
219    /// Set read buffer parameters.
220    ///
221    /// By default high watermark is set to 16Kb, low watermark 1kb.
222    #[must_use]
223    pub fn set_read_buf(
224        mut self,
225        high_watermark: usize,
226        low_watermark: usize,
227        cache_size: usize,
228    ) -> Self {
229        self.read_buf.cache_size = cache_size;
230        self.read_buf.high = high_watermark;
231        self.read_buf.low = low_watermark;
232        self.read_buf.half = high_watermark >> 1;
233        self
234    }
235
236    /// Set write buffer page size.
237    ///
238    /// By default page size is set to 16kb.
239    #[must_use]
240    pub fn set_write_page_size(mut self, size: BytePageSize) -> Self {
241        self.write_page_size = size;
242        self
243    }
244
245    /// Sets the write buffer threshold.
246    ///
247    /// The app encodes data in response to incoming data,
248    /// continuing to fill the write buffer until all data
249    /// has been processed. Only then can the runtime wake
250    /// the write task to send the buffered data.
251    ///
252    /// By that time, the buffer may have accumulated a large
253    /// amount of data, causing it to be sent in large bursts,
254    /// which introduces latency. To prevent this behavior and
255    /// flatten data delivery to the peer, ntex's io can initiate
256    /// out-of-order writes based on a configured threshold.
257    ///
258    /// Set `0` to disable send-buf.
259    #[must_use]
260    pub fn set_write_buf_threshold(mut self, size: usize) -> Self {
261        self.write_buf_threshold = size;
262        self
263    }
264
265    /// Set write buffer parameters.
266    ///
267    /// By default high watermark is set to 16Kb, low watermark 1kb.
268    #[must_use]
269    pub fn set_write_buf(
270        mut self,
271        high_watermark: usize,
272        low_watermark: usize,
273        cache_size: usize,
274    ) -> Self {
275        self.write_buf.cache_size = cache_size;
276        self.write_buf.high = high_watermark;
277        self.write_buf.low = low_watermark;
278        self.write_buf.half = high_watermark >> 1;
279        self
280    }
281}
282
283impl BufConfig {
284    #[inline]
285    /// Get buffer
286    pub fn get(&self) -> BytesMut {
287        if let Some(buf) =
288            CACHE.with(|c| c.with(self.idx, self.first, |c: &mut Vec<_>| c.pop()))
289        {
290            buf
291        } else {
292            BytesMut::with_capacity(self.high)
293        }
294    }
295
296    /// Get buffer with capacity
297    pub fn buf_with_capacity(&self, cap: usize) -> BytesMut {
298        BytesMut::with_capacity(cap)
299    }
300
301    #[inline]
302    /// Resize buffer
303    pub fn resize(&self, buf: &mut BytesMut) {
304        if buf.remaining_mut() < self.low {
305            self.resize_min(buf, self.high);
306        }
307    }
308
309    #[inline]
310    /// Resize buffer
311    pub fn resize_min(&self, buf: &mut BytesMut, size: usize) {
312        let mut avail = buf.remaining_mut();
313        if avail < size {
314            let mut new_cap = buf.capacity();
315            while avail < size {
316                avail += self.high;
317                new_cap += self.high;
318            }
319            buf.reserve_capacity(new_cap);
320        }
321    }
322
323    #[inline]
324    /// Release buffer, buf must be allocated from this pool
325    pub fn release(&self, mut buf: BytesMut) {
326        let cap = buf.capacity();
327        if cap > self.low && cap <= self.high {
328            CACHE.with(|c| {
329                c.with(self.idx, self.first, |v: &mut Vec<_>| {
330                    if v.len() < self.cache_size {
331                        buf.clear();
332                        v.push(buf);
333                    }
334                });
335            });
336        }
337    }
338}
339
340struct LocalCache {
341    cache: UnsafeCell<Vec<(Vec<BytesMut>, Vec<BytesMut>)>>,
342}
343
344impl LocalCache {
345    fn new() -> Self {
346        Self {
347            cache: UnsafeCell::new(Vec::with_capacity(16)),
348        }
349    }
350
351    fn with<F, R>(&self, idx: usize, first: bool, f: F) -> R
352    where
353        F: FnOnce(&mut Vec<BytesMut>) -> R,
354    {
355        let cache = unsafe { &mut *self.cache.get() };
356
357        while cache.len() <= idx {
358            cache.push((Vec::new(), Vec::new()));
359        }
360        if first {
361            f(&mut cache[idx].0)
362        } else {
363            f(&mut cache[idx].1)
364        }
365    }
366}