1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
//! Channel multiplexer configuration.
use std::time::Duration;
use super::msg::MAX_MSG_LENGTH;
/// Behavior when ports are exhausted and a connect is requested.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[non_exhaustive]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum PortsExhausted {
/// Immediately fail connect request.
Fail,
/// Wait for a port to become available with an optional timeout.
Wait(Option<Duration>),
}
/// Channel multiplexer configuration.
///
/// In most cases the default configuration ([Cfg::default]) is recommended, since it
/// provides a good balance between throughput, memory usage and latency.
///
/// In case of unsatisfactory performance (low throughput) your first step should be
/// to increase the [receive buffer size](Self::receive_buffer).
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct Cfg {
/// Time after which connection is closed when no data is.
///
/// Pings are send automatically when this is enabled and no data is transmitted.
/// By default this is 60 seconds.
pub connection_timeout: Option<Duration>,
/// Maximum number of open ports.
///
/// This must not exceed 2^31 = 2147483648.
/// By default this is 16384.
pub max_ports: u32,
/// Default behavior when ports are exhausted and a connect is requested.
///
/// This can be overridden on a per-request basis.
/// By default this is wait with a timeout of 60 seconds.
pub ports_exhausted: PortsExhausted,
/// Maximum size of received data per message in bytes.
///
/// [Receiver::recv_chunk](super::Receiver::recv_chunk) is not affected by this limit.
///
/// [Remote channels](crate::rch) will spawn a serialization and deserialization thread
/// to transmit and receive data in chunks if this limit is reached.
/// Thus, this does not limit the maximum serialized data size for remote channels
/// but will incur a small performance cost for inter-thread communication when exceeded.
///
/// This can be configured on a per-receiver basis.
/// By default this is 512 kB.
pub max_data_size: usize,
/// Maximum port requests received per message.
///
/// For [remote channels](crate::rch) this configures how many more ports than expected
/// (from the data type) can be received per message.
/// This is useful for compatibility when the receiver has an older version of a struct
/// type with less fields containing ports.
///
/// This can be configured on a per-receiver basis.
/// By default this is 128.
pub max_received_ports: usize,
/// Size of a chunk of data in bytes.
///
/// By default this is 16 kB.
/// This must be at least 4 bytes.
/// This must not exceed 2^32 - 16 = 4294967279.
pub chunk_size: u32,
/// Size of receive buffer of each port in bytes.
///
/// This controls the maximum amout of in-flight data per port, that is data on the transport
/// plus received but yet unprocessed data.
///
/// Increase this value if the throughput (bytes per second) is significantly
/// lower than you would expect from your underlying transport connection.
///
/// By default this is 512 kB.
/// This must be at least 4 bytes.
pub receive_buffer: u32,
/// Length of global send queue.
/// Each element holds a chunk.
///
/// This limits the number of chunks sendable by using
/// [Sender::try_send](super::Sender::try_send).
/// It will not affect [remote channels](crate::rch).
///
/// By default this is 16.
/// This must not be zero.
pub shared_send_queue: usize,
/// Length of transport send queue.
/// Each element holds a chunk.
///
/// Raising this may improve performance but might incur a slight increase in latency.
/// For minimum latency this should be set to 1.
///
/// By default this is 16.
/// This must not be zero.
pub transport_send_queue: usize,
/// Length of transport receive queue.
/// Each element holds a chunk.
///
/// Raising this may improve performance but might incur a slight increase in latency.
/// For minimum latency this should be set to 1.
///
/// By default this is 16.
/// This must not be zero.
pub transport_receive_queue: usize,
/// Maximum number of outstanding connection requests.
///
/// By default this is 128.
/// This must not be zero.
pub connect_queue: u16,
/// Time to wait when no data is available for sending before flushing the send buffer
/// of the connection.
///
/// By default this is 20 milliseconds.
pub flush_delay: Duration,
#[doc(hidden)]
pub _non_exhaustive: (),
}
impl Default for Cfg {
/// The default configuration provides a balance between throughput,
/// memory usage and latency.
fn default() -> Self {
Self {
connection_timeout: Some(Duration::from_secs(60)),
max_ports: 16_384,
ports_exhausted: PortsExhausted::Wait(Some(Duration::from_secs(60))),
max_data_size: 524_288,
max_received_ports: 128,
chunk_size: 16_384,
receive_buffer: 524_288,
shared_send_queue: 16,
transport_send_queue: 16,
transport_receive_queue: 16,
connect_queue: 128,
flush_delay: Duration::from_millis(20),
_non_exhaustive: (),
}
}
}
impl Cfg {
/// Checks the configuration.
///
/// # Panics
/// Panics if the configuration is invalid.
pub(crate) fn check(&self) {
if self.max_ports > 2u32.pow(31) {
panic!("maximum ports must not exceed 2^31");
}
if self.chunk_size < 4 {
panic!("chunk size must be at least 4");
}
if self.receive_buffer < 4 {
panic!("receive buffer must be at least 4 bytes");
}
if self.shared_send_queue == 0 {
panic!("shared send queue length must not be zero");
}
if self.transport_send_queue == 0 {
panic!("transport send queue length must not be zero");
}
if self.transport_receive_queue == 0 {
panic!("transport receive queue length must not be zero");
}
if self.connect_queue == 0 {
panic!("connect queue length must not be zero");
}
}
/// Returns the maximum size of a frame that can be received by a
/// channel multiplexer using this configuration.
///
/// # Panics
/// Panics if the configuration is invalid.
pub fn max_frame_length(&self) -> u32 {
(MAX_MSG_LENGTH as u32).checked_add(self.chunk_size).expect("maximum frame size exceeds u32::MAX")
}
/// Configuration that is balanced between memory usage, latency and throughput.
pub fn balanced() -> Self {
Self::default()
}
/// Configuration that is optimized for low memory usage and low latency
/// but may be throughput-limited.
pub fn compact() -> Self {
Self {
shared_send_queue: 1,
transport_receive_queue: 1,
transport_send_queue: 1,
receive_buffer: 16_384,
chunk_size: 4096,
..Default::default()
}
}
/// Configuration that is throughput-optimized but may use more memory per
/// channel and may have higher latency.
pub fn throughput() -> Self {
Self {
shared_send_queue: 64,
transport_receive_queue: 64,
transport_send_queue: 64,
receive_buffer: 1_048_576,
chunk_size: 32_768,
..Default::default()
}
}
}