1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
//! Channel multiplexer configuration.

use std::time::Duration;

use super::msg::MAX_MSG_LENGTH;

/// Behavior when ports are exhausted and a connect is requested.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[non_exhaustive]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum PortsExhausted {
    /// Immediately fail connect request.
    Fail,
    /// Wait for a port to become available with an optional timeout.
    Wait(Option<Duration>),
}

/// Channel multiplexer configuration.
///
/// In most cases the default configuration ([Cfg::default]) is recommended, since it
/// provides a good balance between throughput, memory usage and latency.
///
/// In case of unsatisfactory performance (low throughput) your first step should be
/// to increase the [receive buffer size](Self::receive_buffer).
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct Cfg {
    /// Time after which connection is closed when no data is.
    ///
    /// Pings are send automatically when this is enabled and no data is transmitted.
    /// By default this is 60 seconds.
    pub connection_timeout: Option<Duration>,
    /// Maximum number of open ports.
    ///
    /// This must not exceed 2^31 = 2147483648.
    /// By default this is 16384.
    pub max_ports: u32,
    /// Default behavior when ports are exhausted and a connect is requested.
    ///
    /// This can be overridden on a per-request basis.
    /// By default this is wait with a timeout of 60 seconds.
    pub ports_exhausted: PortsExhausted,
    /// Maximum size of received data per message in bytes.
    ///
    /// [Receiver::recv_chunk](super::Receiver::recv_chunk) is not affected by this limit.
    ///
    /// [Remote channels](crate::rch) will spawn a serialization and deserialization thread
    /// to transmit and receive data in chunks if this limit is reached.
    /// Thus, this does not limit the maximum serialized data size for remote channels
    /// but will incur a small performance cost for inter-thread communication when exceeded.
    ///
    /// This can be configured on a per-receiver basis.
    /// By default this is 512 kB.
    pub max_data_size: usize,
    /// Maximum port requests received per message.
    ///
    /// For [remote channels](crate::rch) this configures how many more ports than expected
    /// (from the data type) can be received per message.
    /// This is useful for compatibility when the receiver has an older version of a struct
    /// type with less fields containing ports.
    ///
    /// This can be configured on a per-receiver basis.
    /// By default this is 128.
    pub max_received_ports: usize,
    /// Size of a chunk of data in bytes.
    ///
    /// By default this is 16 kB.
    /// This must be at least 4 bytes.
    /// This must not exceed 2^32 - 16 = 4294967279.
    pub chunk_size: u32,
    /// Size of receive buffer of each port in bytes.
    ///
    /// This controls the maximum amout of in-flight data per port, that is data on the transport
    /// plus received but yet unprocessed data.
    ///
    /// Increase this value if the throughput (bytes per second) is significantly
    /// lower than you would expect from your underlying transport connection.
    ///
    /// By default this is 512 kB.
    /// This must be at least 4 bytes.
    pub receive_buffer: u32,
    /// Length of global send queue.
    /// Each element holds a chunk.
    ///
    /// This limits the number of chunks sendable by using
    /// [Sender::try_send](super::Sender::try_send).
    /// It will not affect [remote channels](crate::rch).
    ///
    /// By default this is 16.
    /// This must not be zero.
    pub shared_send_queue: usize,
    /// Length of transport send queue.
    /// Each element holds a chunk.
    ///
    /// Raising this may improve performance but might incur a slight increase in latency.
    /// For minimum latency this should be set to 1.
    ///
    /// By default this is 16.
    /// This must not be zero.
    pub transport_send_queue: usize,
    /// Length of transport receive queue.
    /// Each element holds a chunk.
    ///
    /// Raising this may improve performance but might incur a slight increase in latency.
    /// For minimum latency this should be set to 1.
    ///
    /// By default this is 16.
    /// This must not be zero.
    pub transport_receive_queue: usize,
    /// Maximum number of outstanding connection requests.
    ///
    /// By default this is 128.
    /// This must not be zero.
    pub connect_queue: u16,
    /// Time to wait when no data is available for sending before flushing the send buffer
    /// of the connection.
    ///
    /// By default this is 20 milliseconds.
    pub flush_delay: Duration,
    #[doc(hidden)]
    pub _non_exhaustive: (),
}

impl Default for Cfg {
    /// The default configuration provides a balance between throughput,
    /// memory usage and latency.
    fn default() -> Self {
        Self {
            connection_timeout: Some(Duration::from_secs(60)),
            max_ports: 16_384,
            ports_exhausted: PortsExhausted::Wait(Some(Duration::from_secs(60))),
            max_data_size: 524_288,
            max_received_ports: 128,
            chunk_size: 16_384,
            receive_buffer: 524_288,
            shared_send_queue: 16,
            transport_send_queue: 16,
            transport_receive_queue: 16,
            connect_queue: 128,
            flush_delay: Duration::from_millis(20),
            _non_exhaustive: (),
        }
    }
}

impl Cfg {
    /// Checks the configuration.
    ///
    /// # Panics
    /// Panics if the configuration is invalid.
    pub(crate) fn check(&self) {
        if self.max_ports > 2u32.pow(31) {
            panic!("maximum ports must not exceed 2^31");
        }

        if self.chunk_size < 4 {
            panic!("chunk size must be at least 4");
        }

        if self.receive_buffer < 4 {
            panic!("receive buffer must be at least 4 bytes");
        }

        if self.shared_send_queue == 0 {
            panic!("shared send queue length must not be zero");
        }

        if self.transport_send_queue == 0 {
            panic!("transport send queue length must not be zero");
        }

        if self.transport_receive_queue == 0 {
            panic!("transport receive queue length must not be zero");
        }

        if self.connect_queue == 0 {
            panic!("connect queue length must not be zero");
        }
    }

    /// Returns the maximum size of a frame that can be received by a
    /// channel multiplexer using this configuration.
    ///
    /// # Panics
    /// Panics if the configuration is invalid.
    pub fn max_frame_length(&self) -> u32 {
        (MAX_MSG_LENGTH as u32).checked_add(self.chunk_size).expect("maximum frame size exceeds u32::MAX")
    }

    /// Configuration that is balanced between memory usage, latency and throughput.
    pub fn balanced() -> Self {
        Self::default()
    }

    /// Configuration that is optimized for low memory usage and low latency
    /// but may be throughput-limited.
    pub fn compact() -> Self {
        Self {
            shared_send_queue: 1,
            transport_receive_queue: 1,
            transport_send_queue: 1,
            receive_buffer: 16_384,
            chunk_size: 4096,
            ..Default::default()
        }
    }

    /// Configuration that is throughput-optimized but may use more memory per
    /// channel and may have higher latency.
    pub fn throughput() -> Self {
        Self {
            shared_send_queue: 64,
            transport_receive_queue: 64,
            transport_send_queue: 64,
            receive_buffer: 1_048_576,
            chunk_size: 32_768,
            ..Default::default()
        }
    }
}