reactive_messaging/config.rs
1//! Contains constants and other configuration information affecting default & fixed behaviors of this library
2
3use std::{ops::RangeInclusive, num::NonZeroU8};
4use std::time::Duration;
5use reactive_mutiny::prelude::Instruments;
6//use strum_macros::FromRepr;
7
8
9// /// Specifies the channels (queues) from `reactive-mutiny` that may be used to send/receive data.\
10// /// On different hardware, the performance characteristics may vary.
11// #[derive(Debug,PartialEq,FromRepr)]
12// pub enum Channels<const CONFIG: u64> {
13// Atomic { uni: Option<>, sender: Option<> },
14// FullSync { uni: None, sender: None },
15// Crossbeam {},
16// }
17
18/// Specifies how to behave when communication failures happen
19#[derive(Debug,PartialEq)]
20pub enum RetryingStrategies {
21
22 /// Simply ignore full buffer failures denials of sending & receiving messages, without retrying nor dropping the connection.\
23 /// This option is acceptable when missing messages don't disrupt the communications and when low latencies / realtime-ish behavior is required.\
24 /// Set [ConstConfig::sender_buffer] & [ConstConfig::receiver_buffer] accordingly.
25 DoNotRetry,
26
27 /// Drops the connection on "buffer is full" errors, also without retrying
28 EndCommunications,
29
30 /// Retries, in case of "buffer is full" errors, ending the communications if success still can't be achieved.\
31 /// Uses an Exponential Backoff strategy with factor 2.526 and 20% jitter, giving the milliseconds to sleep between,
32 /// at most, the given number of attempts.\
33 /// The total retrying time would be the sum of the geometric progression: (-1+2.526^n)/(1.526) -- in milliseconds.\
34 /// Example: for up to 5 minutes retrying, use 14 attempts.
35 RetryWithBackoffUpTo(u8),
36
37 /// Retries, in case of "buffer is full" errors, ending the communications if success still can't be achieved
38 /// during the specified milliseconds -- during which retrying will be performed in a pool loop, yielding
39 /// to tokio before each attempt.\
40 /// Use this option if low latency is desirable -- but see also [Self::RetrySleepingArithmetically]
41 RetryYieldingForUpToMillis(u8),
42
43 /// Deprecated. Do not use -- to be replaced or removed, as spinning doesn't make sense in this lib
44 RetrySpinningForUpToMillis(u8),
45 // /// reconnect if dropped? this may go as normal parameter... and on the client only
46}
47impl RetryingStrategies {
48 /// requires 3+8=11 bits to represent the data; reverse of [Self::from_repr()]
49 const fn as_repr(&self) -> u16 {
50 match self {
51 Self::DoNotRetry => 0,
52 Self::EndCommunications => 1,
53 Self::RetryWithBackoffUpTo(n) => 2 | (*n as u16) << 3,
54 Self::RetryYieldingForUpToMillis(n) => 3 | (*n as u16) << 3,
55 Self::RetrySpinningForUpToMillis(n) => 4 | (*n as u16) << 3,
56 }
57 }
58 /// reverse of [Self::from_repr()]
59 const fn from_repr(repr: u16) -> Self {
60 let (variant, n) = (repr & 7, repr >> 3);
61 match variant {
62 0 => Self::DoNotRetry,
63 1 => Self::EndCommunications,
64 2 => Self::RetryWithBackoffUpTo(n as u8),
65 3 => Self::RetryYieldingForUpToMillis(n as u8),
66 4 => Self::RetrySpinningForUpToMillis(n as u8),
67 _ => unreachable!(), // If this errors out, did a new enum member was added?
68 }
69 }
70}
71
72/// Socket options for the local peer to be set when the connection is established
73#[derive(Debug,PartialEq)]
74pub struct SocketOptions {
75 /// Also known as time-to-live (TTL), specifies how many hops may relay an outgoing packet before it being dropped and an error being returned.\
76 /// If NonZero, will cause the socket configuration function to be called with that value.
77 pub hops_to_live: Option<NonZeroU8>,
78 /// If specified, must be a power of 2 with the number of milliseconds to wait for any unsent messages when closing the connection.\
79 /// In Linux, defaults to 0.
80 pub linger_millis: Option<u32>,
81 /// Set this to `true` if lower latency is preferred over throughput; `false` (default on Linux) to use all the available bandwidth
82 /// (sending full packets, waiting up to 200ms for fulfillment).\
83 /// `None` will leave it as the system's default -- in Linux, false.
84 /// Some hints:
85 /// - The peer reporting events may prefer to set it to `true`;
86 /// - The other peer, receiving events from many, many peers and sending messages that won't be used for decision-making, may set it to `false`
87 pub no_delay: Option<bool>,
88}
89impl SocketOptions {
90 /// requires 8+(1+5)+(1+1)=16 bits to represent the data; reverse of [Self::from_repr()]
91 const fn as_repr(&self) -> u32 {
92 (self.no_delay.is_some() as u32) // << 0 // no delay flag
93 | (unwrap_bool_or_default(self.no_delay) as u32) << 1 // no delay data
94 | (self.linger_millis.is_some() as u32) << 2 // linger flag
95 | if let Some(linger_millis) = self.linger_millis {
96 if linger_millis > 0 {
97 set_bits_from_power_of_2_u32(0, 3..=7, linger_millis) as u32 // 3..=7 // linger data
98 } else {
99 0
100 }
101 } else {
102 0
103 }
104 | (unwrap_non_zero_u8_or_default(self.hops_to_live) as u32) << 8 // hops
105 }
106 /// reverse of [Self::from_repr()]
107 const fn from_repr(repr: u32) -> Self {
108 let (no_delay_flag, no_delay_data, linger_flag, linger_data, hops) = (
109 (repr & (1 << 0) ) > 0, // no delay flag
110 (repr & (1 << 1) ) > 0, // no delay data
111 (repr & (1 << 2) ) > 0, // linger flag
112 get_power_of_2_u32_bits(repr as u64, 3..=7), // power of 32 bits linger data
113 (repr & (( (!0u8) as u32) << 8) ) >> 8, // raw 8 bits hops data
114 );
115 Self {
116 hops_to_live: if hops > 0 { NonZeroU8::new(hops as u8) } else { None },
117 linger_millis: if linger_flag { Some(linger_data) } else { None },
118 no_delay: if no_delay_flag { Some(no_delay_data) } else { None },
119 }
120 }
121}
122
123
124/// Implements something that could be called the "Zero-Cost Const Configuration Pattern", that produces a `usize`
125/// whose goal is to be the only const parameter of a generic struct (avoiding the alternative of bloating it with several const params).\
126/// When using the const "query functions" defined here in `if`s, the compiler will have the opportunity to cancel out any unreachable code (zero-cost abstraction).\
127/// Some commonly used combinations may be pre-defined in some enum variants, but you may always build unmapped possibilities through [Self::custom()].\
128/// Usage examples:
129/// ```nocompile
130/// see bellow
131#[derive(Debug,PartialEq)]
132pub struct ConstConfig {
133 /// Pre-allocates the sender/receiver buffers to this value (power of 2).
134 /// Setting it wisely may economize some `realloc` calls
135 pub msg_size_hint: u32,
136 /// How many messages (per peer) may be enqueued for output (power of 2)
137 /// before operations start to fail
138 pub sender_buffer: u32,
139 /// How many messages (per peer) may be enqueued for processing (power of 2)
140 /// before operations start to fail
141 pub receiver_buffer: u32,
142 /// How many milliseconds to wait when flushing messages out to a to-be-closed connection.
143 pub flush_timeout_millis: u16,
144 /// Specifies what to do when operations fail (full buffers / connection droppings)
145 pub retrying_strategy: RetryingStrategies,
146 /// Messes with the low level (system) socket options
147 pub socket_options: SocketOptions,
148 // /// Allows changing the backing queue for the sender/receiver buffers
149 // pub channel: Channels,
150 /// Allows changing the Stream executor options in regard to logging & collected/reported metrics
151 pub executor_instruments: /*reactive_mutiny::*/Instruments,
152}
153
154#[warn(non_snake_case)]
155impl ConstConfig {
156
157 #![allow(non_snake_case)] // some consts accepts parameters... _/o\_
158
159 // the consts here determine what bits they use
160 // and may also specify ranges for store data (rather than just flags)
161
162 /// u32_value = 2^n
163 const MSG_SIZE_HINT: RangeInclusive<usize> = 0..=4;
164 /// u32_value = 2^n
165 const SENDER_BUFFER: RangeInclusive<usize> = 5..=9;
166 /// u32_value = 2^n
167 const RECEIVER_BUFFER: RangeInclusive<usize> = 10..=14;
168 /// u16_value = 2^n
169 const FLUSH_TIMEOUT_MILLIS: RangeInclusive<usize> = 15..=18;
170 /// One of [RetryingStrategies], converted by [RetryingStrategies::as_repr()]
171 const RETRYING_STRATEGY: RangeInclusive<usize> = 19..=29;
172 /// One of [SocketOptions], converted by [SocketOptions::as_repr()]
173 const SOCKET_OPTIONS: RangeInclusive<usize> = 30..=45;
174 // /// The channel types the synthatic-sugar macros should instantiate
175 // const CHANNEL: RangeInclusive<usize> = 46..=48;
176 /// The 8 bits from `reactive-mutiny`
177 const EXECUTOR_INSTRUMENTS: RangeInclusive<usize> = 49..=57;
178
179
180 /// Contains sane & performant defaults.\
181 /// Usage example:
182 /// ```nocompile
183 /// const CONFIG: ConstConfig = ConstConfig {
184 /// receiver_buffer: 1024,
185 /// ..ConstConfig::default()
186 /// };
187 pub const fn default() -> ConstConfig {
188 ConstConfig {
189 msg_size_hint: 1024,
190 sender_buffer: 1024,
191 receiver_buffer: 1024,
192 flush_timeout_millis: 256,
193 retrying_strategy: RetryingStrategies::RetryWithBackoffUpTo(20),
194 socket_options: SocketOptions { hops_to_live: NonZeroU8::new(255), linger_millis: Some(128), no_delay: Some(true) },
195 // channel: Channels::Atomic,
196 executor_instruments: Instruments::from(Instruments::NoInstruments.into()),
197 }
198 }
199
200 /// For use when instantiating a generic struct that uses the "Const Config Pattern"
201 /// -- when choosing a pre-defined configuration.\
202 /// See also [Self::custom()].\
203 /// Example:
204 /// ```nocompile
205 /// see bellow
206 pub const fn into(self) -> u64 {
207 let mut config = 0u64;
208 config = set_bits_from_power_of_2_u32(config, Self::MSG_SIZE_HINT, self.msg_size_hint);
209 config = set_bits_from_power_of_2_u32(config, Self::SENDER_BUFFER, self.sender_buffer);
210 config = set_bits_from_power_of_2_u32(config, Self::RECEIVER_BUFFER, self.receiver_buffer);
211 config = set_bits_from_power_of_2_u16(config, Self::FLUSH_TIMEOUT_MILLIS, self.flush_timeout_millis);
212 let retrying_strategy_repr = self.retrying_strategy.as_repr();
213 config = set_bits(config, Self::RETRYING_STRATEGY, retrying_strategy_repr as u64);
214 let socket_options_repr = self.socket_options.as_repr();
215 config = set_bits(config, Self::SOCKET_OPTIONS, socket_options_repr as u64);
216 // let channel_repr = self.channel as u8;
217 // config = set_bits(config, Self::CHANNEL, channel_repr as u64);
218 let executor_instruments_repr = self.executor_instruments.into();
219 config = set_bits(config, Self::EXECUTOR_INSTRUMENTS, executor_instruments_repr as u64);
220 config
221 }
222
223 /// Builds [Self] from the generic `const CONFIGS: usize` parameter used in structs
224 /// by the "Const Config Pattern"
225 pub const fn from(config: u64) -> Self {
226 let msg_size_hint = get_power_of_2_u32_bits(config, Self::MSG_SIZE_HINT);
227 let sender_buffer = get_power_of_2_u32_bits(config, Self::SENDER_BUFFER);
228 let receiver_buffer = get_power_of_2_u32_bits(config, Self::RECEIVER_BUFFER);
229 let flush_timeout_millis = get_power_of_2_u16_bits(config, Self::FLUSH_TIMEOUT_MILLIS);
230 let retrying_strategy_repr = get_bits(config, Self::RETRYING_STRATEGY);
231 let socket_options_repr = get_bits(config, Self::SOCKET_OPTIONS);
232 // let channel_repr = get_bits(config, Self::CHANNEL);
233 let executor_instruments_repr = get_bits(config, Self::EXECUTOR_INSTRUMENTS);
234 Self {
235 msg_size_hint,
236 flush_timeout_millis,
237 sender_buffer,
238 receiver_buffer,
239 retrying_strategy: RetryingStrategies::from_repr(retrying_strategy_repr as u16),
240 socket_options: SocketOptions::from_repr(socket_options_repr as u32),
241 // channel: if let Some(channel) = Channels::from_repr(channel_repr as usize) {channel} else {Channels::Atomic},
242 executor_instruments: Instruments::from(executor_instruments_repr as usize),
243 }
244 }
245
246 // query functions for business logic configuration attributes
247 //////////////////////////////////////////////////////////////
248 // to be used by the struct in which the generic `const CONFIGS: usize` resides
249
250 pub const fn extract_receiver_buffer(config: u64) -> u32 {
251 let config = Self::from(config);
252 config.receiver_buffer
253 }
254
255 pub const fn extract_executor_instruments(config: u64) -> usize {
256 let config = Self::from(config);
257 config.executor_instruments.into()
258 }
259
260 pub const fn extract_msg_size_hint(config: u64) -> u32 {
261 let config = Self::from(config);
262 config.msg_size_hint
263 }
264
265 pub const fn extract_graceful_close_timeout(config: u64) -> Duration {
266 let config = Self::from(config);
267 Duration::from_millis(config.flush_timeout_millis as u64)
268 }
269
270 pub const fn extract_retrying_strategy(config: u64) -> RetryingStrategies {
271 let config = Self::from(config);
272 config.retrying_strategy
273 }
274
275 pub const fn extract_socket_options(config: u64) -> SocketOptions {
276 let config = Self::from(config);
277 config.socket_options
278 }
279}
280
281/// Helper for retrieving data (other than simple flags) from the configuration
282/// -- as stored in the specified `bits` by [Self::set_bits()]
283const fn get_bits(config: u64, bits: RangeInclusive<usize>) -> u64 {
284 let bits_len = *bits.end()-*bits.start()+1;
285 (config>>*bits.start()) & ((1<<bits_len)-1)
286}
287
288/// Helper for storing data (other than simple flags) in the configuration
289/// -- stored in the specified `bits`.\
290/// `value` should not be higher than what fits in the bits.\
291/// Returns the `configs` with the `value` applied to it in a way it may be retrieved by [Self::get_bits()]
292const fn set_bits(mut config: u64, bits: RangeInclusive<usize>, value: u64) -> u64 {
293 let bits_len = *bits.end()-*bits.start()+1;
294 if value > (1<<bits_len)-1 {
295 // "The value specified is above the maximum the reserved bits for it can take"
296 unreachable!();
297 } else {
298 config &= !( ((1<<bits_len)-1) << *bits.start() ); // clear the target bits
299 config |= value << *bits.start(); // set them
300 config
301 }
302}
303
304/// Retrieves 5 `bits` from `configs` that represents a power of 2 over the `u32` space
305const fn get_power_of_2_u32_bits(config: u64, bits: RangeInclusive<usize>) -> u32 {
306 let value = get_bits(config, bits);
307 1 << value
308}
309
310/// Packs, optimally, the `power_of_2_u32_value` into 5 `bits`, returning the new value for the given `config`
311const fn set_bits_from_power_of_2_u32(config: u64, bits: RangeInclusive<usize>, power_of_2_u32_value: u32) -> u64 {
312 if power_of_2_u32_value.is_power_of_two() {
313 set_bits(config, bits, power_of_2_u32_value.ilog2() as u64)
314 } else {
315 // "The value must be a power of 2"
316 unreachable!();
317 }
318}
319
320/// Retrieves 4 `bits` from `configs` that represents a power of 2 over the `u16` space
321const fn get_power_of_2_u16_bits(config: u64, bits: RangeInclusive<usize>) -> u16 {
322 let value = get_bits(config, bits);
323 1 << value
324}
325
326/// Packs, optimally, the `power_of_2_u16_value` into 4 `bits`, returning the new value for the given `config`
327const fn set_bits_from_power_of_2_u16(config: u64, bits: RangeInclusive<usize>, power_of_2_u16_value: u16) -> u64 {
328 if power_of_2_u16_value.is_power_of_two() {
329 set_bits(config, bits, power_of_2_u16_value.ilog2() as u64)
330 } else {
331 // "The value must be a power of 2"
332 unreachable!();
333 }
334}
335
336/// Retrieves 3 `bits` from `configs` that represents a power of 2 over the `u8` space
337const fn _get_power_of_3_u8_bits(config: u64, bits: RangeInclusive<usize>) -> u8 {
338 let value = get_bits(config, bits);
339 1 << value
340}
341
342/// Packs, optimally, the `power_of_2_u8_value` into 3 `bits`, returning the new value for the given `config`
343const fn _set_bits_from_power_of_2_u8(config: u64, bits: RangeInclusive<usize>, power_of_2_u8_value: u8) -> u64 {
344 if power_of_2_u8_value.is_power_of_two() {
345 set_bits(config, bits, power_of_2_u8_value.ilog2() as u64)
346 } else {
347 // "The value must be a power of 2"
348 unreachable!();
349 }
350}
351
352// const versions of some `Option<>` functions
353//////////////////////////////////////////////
354
355/// same as Option::<bool>::unwrap_or(false), but const
356const fn unwrap_bool_or_default(option: Option<bool>) -> bool {
357 match option {
358 Some(v) => v,
359 None => false,
360 }
361}
362/// same as Option::<u8>::unwrap_or(0), but const
363const fn _unwrap_u8_or_default(option: Option<u8>) -> u8 {
364 match option {
365 Some(v) => v,
366 None => 0,
367 }
368}
369/// same as Option::<u16>::unwrap_or(0), but const
370const fn _unwrap_u16_or_default(option: Option<u16>) -> u16 {
371 match option {
372 Some(v) => v,
373 None => 0,
374 }
375}
376/// same as Option::<u32>::unwrap_or(0), but const
377const fn _unwrap_u32_or_default(option: Option<u32>) -> u32 {
378 match option {
379 Some(v) => v,
380 None => 0,
381 }
382}
383
384// same as Option::<NonZero*>::map(|v| v.get()).unwrap_or(0), but const
385const fn unwrap_non_zero_u8_or_default(option: Option<NonZeroU8>) -> u8 {
386 match option {
387 Some(v) => v.get(),
388 None => 0,
389 }
390}
391
392
393/// Unit tests & enforces the requisites of the [stream_executor](self) module.\
394/// Tests here mixes manual & automated assertions -- you should manually inspect the output of each one and check if the log outputs make sense
395#[cfg(any(test,doc))]
396mod tests {
397 use super::*;
398
399 #[cfg_attr(not(doc),test)]
400 fn retrying_strategies_repr() {
401 let subjects = vec![
402 vec![
403 RetryingStrategies::DoNotRetry,
404 RetryingStrategies::EndCommunications,
405 ].into_iter(),
406 (0..8).map(|n| RetryingStrategies::RetryWithBackoffUpTo(1<<n)).collect::<Vec<_>>().into_iter(),
407 (0..8).map(|n| RetryingStrategies::RetryYieldingForUpToMillis(1<<n)).collect::<Vec<_>>().into_iter(),
408 (0..8).map(|n| RetryingStrategies::RetrySpinningForUpToMillis(1<<n)).collect::<Vec<_>>().into_iter(),
409 ].into_iter().flatten();
410
411 for expected in subjects {
412 let converted = RetryingStrategies::as_repr(&expected);
413 let reconverted = RetryingStrategies::from_repr(converted);
414 assert_eq!(reconverted, expected, "FAILED: {:?} (repr: 0x{:x}); reconverted: {:?}", expected, converted, reconverted);
415 }
416 }
417
418 #[cfg_attr(not(doc),test)]
419 fn socket_options_repr() {
420 let subjects = vec![
421 vec![
422 SocketOptions { hops_to_live: None, linger_millis: None, no_delay: None},
423 SocketOptions { hops_to_live: None, linger_millis: None, no_delay: Some(false)},
424 SocketOptions { hops_to_live: None, linger_millis: None, no_delay: Some(true)},
425 SocketOptions { hops_to_live: None, linger_millis: Some(1), no_delay: None},
426 SocketOptions { hops_to_live: NonZeroU8::new(1), linger_millis: None, no_delay: None},
427 ].into_iter(),
428 (0..31).map(|n| SocketOptions { hops_to_live: None, linger_millis: Some(1<<n), no_delay: None }).collect::<Vec<_>>().into_iter(),
429 (0..8) .map(|n| SocketOptions { hops_to_live: NonZeroU8::new(1<<n), linger_millis: None, no_delay: None }).collect::<Vec<_>>().into_iter(),
430 ].into_iter().flatten();
431
432 for expected in subjects {
433 let converted = SocketOptions::as_repr(&expected);
434 let reconverted = SocketOptions::from_repr(converted);
435 assert_eq!(reconverted, expected, "FAILED: {:?} (repr: 0x{:x}); reconverted: {:?}", expected, converted, reconverted);
436 }
437 // for expected in subjects {
438 // let converted = SocketOptions::into_repr(&expected);
439 // let reconverted = SocketOptions::from_repr(converted);
440 // println!("{:?}: repr: 0x{:x}; worked? {} ---- {:?}", expected, converted, reconverted==expected, reconverted);
441 // }
442
443 }
444
445 #[cfg_attr(not(doc),test)]
446 fn const_config() {
447 let expected = || ConstConfig {
448 msg_size_hint: 1024,
449 sender_buffer: 2048,
450 receiver_buffer: 2048,
451 flush_timeout_millis: 256,
452 retrying_strategy: RetryingStrategies::RetryWithBackoffUpTo(14),
453 socket_options: SocketOptions { hops_to_live: NonZeroU8::new(255), linger_millis: Some(128), no_delay: Some(true) },
454 // channel: Channels::Atomic,
455 executor_instruments: Instruments::from(Instruments::LogsWithExpensiveMetrics.into()),
456 };
457 let converted = ConstConfig::into(expected());
458 let reconverted = ConstConfig::from(converted);
459 assert_eq!(reconverted, expected(), "FAILED: {:?} (repr: 0x{:x}); reconverted: {:?}", expected(), converted, reconverted);
460 }
461
462}